Refactor semaphore to support atomic semop

This commit is contained in:
Yuke Peng
2024-09-02 12:57:49 +08:00
committed by Tate, Hongliang Tian
parent 9a2faff07b
commit 6ce50fab1c
4 changed files with 417 additions and 272 deletions

View File

@ -1,21 +1,19 @@
// SPDX-License-Identifier: MPL-2.0
use core::{
sync::atomic::{AtomicU16, AtomicU32, AtomicU64, Ordering},
slice::Iter,
sync::atomic::{AtomicU16, Ordering},
time::Duration,
};
use ostd::sync::{PreemptDisabled, Waiter, Waker};
use super::sem_set::SEMVMX;
use super::sem_set::{SemSetInner, SEMVMX};
use crate::{
ipc::{key_t, semaphore::system_v::sem_set::sem_sets, IpcFlags},
prelude::*,
process::{Pid, Process},
time::{
clocks::{RealTimeCoarseClock, JIFFIES_TIMER_MANAGER},
timer::Timeout,
},
process::Pid,
time::{clocks::JIFFIES_TIMER_MANAGER, timer::Timeout},
};
#[derive(Clone, Copy, Debug, Pod)]
@ -42,7 +40,7 @@ impl SemBuf {
#[repr(u16)]
#[derive(Debug, TryFromInt, Clone, Copy)]
enum Status {
pub enum Status {
Normal = 0,
Pending = 1,
Removed = 2,
@ -64,18 +62,36 @@ impl AtomicStatus {
}
}
struct PendingOp {
sem_buf: SemBuf,
/// Pending atomic semop.
pub struct PendingOp {
sops: Vec<SemBuf>,
status: Arc<AtomicStatus>,
waker: Arc<Waker>,
waker: Option<Arc<Waker>>,
pid: Pid,
process: Weak<Process>,
}
impl PendingOp {
pub fn sops_iter(&self) -> Iter<SemBuf> {
self.sops.iter()
}
pub fn set_status(&self, status: Status) {
self.status.set_status(status);
}
pub fn waker(&self) -> &Option<Arc<Waker>> {
&self.waker
}
pub fn pid(&self) -> Pid {
self.pid
}
}
impl Debug for PendingOp {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("PendingOp")
.field("sem_buf", &self.sem_buf)
.field("sops", &self.sops)
.field("status", &(self.status.status()))
.field("pid", &self.pid)
.finish()
@ -84,252 +100,268 @@ impl Debug for PendingOp {
#[derive(Debug)]
pub struct Semaphore {
val: SpinLock<i32>,
val: i32,
/// PID of the process that last modified semaphore.
/// - through semop with op != 0
/// - through semctl with SETVAL and SETALL
/// - through SEM_UNDO when task exit
latest_modified_pid: AtomicU32,
/// Pending alter operations. For each pending operation, it has `sem_op < 0`.
pending_alters: SpinLock<LinkedList<PendingOp>>,
/// Pending zeros operations. For each pending operation, it has `sem_op = 0`.
pending_const: SpinLock<LinkedList<PendingOp>>,
/// Last semop time.
sem_otime: AtomicU64,
latest_modified_pid: Pid,
}
impl Semaphore {
pub fn set_val(&self, val: i32, current_pid: Pid) -> Result<()> {
if !(0..SEMVMX).contains(&val) {
return_errno!(Errno::ERANGE);
}
let mut current_val = self.val.lock();
*current_val = val;
self.update_pending_ops(current_val, current_pid);
Ok(())
pub fn set_val(&mut self, val: i32) {
self.val = val;
}
pub fn val(&self) -> i32 {
*self.val.lock()
self.val
}
pub fn last_modified_pid(&self) -> Pid {
self.latest_modified_pid.load(Ordering::Relaxed)
pub fn set_latest_modified_pid(&mut self, pid: Pid) {
self.latest_modified_pid = pid;
}
pub fn sem_otime(&self) -> Duration {
Duration::from_secs(self.sem_otime.load(Ordering::Relaxed))
}
pub fn pending_zero_count(&self) -> usize {
self.pending_const.lock().len()
}
pub fn pending_alter_count(&self) -> usize {
self.pending_alters.lock().len()
}
/// Notifies the semaphore that the semaphore sets it belongs to have been removed.
pub(super) fn removed(&self) {
let mut pending_alters = self.pending_alters.lock();
for pending_alter in pending_alters.iter_mut() {
pending_alter.status.set_status(Status::Removed);
pending_alter.waker.wake_up();
}
pending_alters.clear();
let mut pending_const = self.pending_const.lock();
for pending_const in pending_const.iter_mut() {
pending_const.status.set_status(Status::Removed);
pending_const.waker.wake_up();
}
pending_const.clear();
pub fn latest_modified_pid(&self) -> Pid {
self.latest_modified_pid
}
pub(super) fn new(val: i32) -> Self {
Self {
val: SpinLock::new(val),
latest_modified_pid: AtomicU32::new(current!().pid()),
pending_alters: SpinLock::new(LinkedList::new()),
pending_const: SpinLock::new(LinkedList::new()),
sem_otime: AtomicU64::new(0),
val,
latest_modified_pid: current!().pid(),
}
}
fn update_otime(&self) {
self.sem_otime.store(
RealTimeCoarseClock::get().read_time().as_secs(),
Ordering::Relaxed,
);
}
fn sem_op(&self, sem_buf: &SemBuf, timeout: Option<Duration>, ctx: &Context) -> Result<()> {
let mut val = self.val.lock();
let sem_op = sem_buf.sem_op;
let current_pid = ctx.process.pid();
let flags = IpcFlags::from_bits_truncate(sem_buf.sem_flags as u32);
if flags.contains(IpcFlags::SEM_UNDO) {
todo!()
}
// Operate val
let positive_condition = sem_op.is_positive();
let negative_condition = sem_op.is_negative() && sem_op.abs() as i32 <= *val;
let zero_condition = sem_op == 0 && *val == 0;
if positive_condition || negative_condition {
let new_val = val
.checked_add(i32::from(sem_op))
.ok_or(Error::new(Errno::ERANGE))?;
if new_val > SEMVMX {
return_errno!(Errno::ERANGE);
}
*val = new_val;
self.update_pending_ops(val, current_pid);
return Ok(());
} else if zero_condition {
return Ok(());
}
drop(val);
// Need to wait for the semaphore
if flags.contains(IpcFlags::IPC_NOWAIT) {
return_errno!(Errno::EAGAIN);
}
// Add current to pending list
let (waiter, waker) = Waiter::new_pair();
let status = Arc::new(AtomicStatus::new(Status::Pending));
let pending_op = PendingOp {
sem_buf: *sem_buf,
status: status.clone(),
waker: waker.clone(),
process: ctx.posix_thread.weak_process(),
pid: current_pid,
};
if sem_op == 0 {
self.pending_const.lock().push_back(pending_op);
} else {
self.pending_alters.lock().push_back(pending_op);
}
// Wait
if let Some(timeout) = timeout {
let jiffies_timer = JIFFIES_TIMER_MANAGER.get().unwrap().create_timer(move || {
waker.wake_up();
});
jiffies_timer.set_timeout(Timeout::After(timeout));
}
waiter.wait();
// Check status and return
match status.status() {
Status::Normal => Ok(()),
Status::Removed => Err(Error::new(Errno::EIDRM)),
Status::Pending => {
let mut pending_ops = if sem_op == 0 {
self.pending_const.lock()
} else {
self.pending_alters.lock()
};
pending_ops.retain(|op| op.pid != current_pid);
Err(Error::new(Errno::EAGAIN))
}
}
}
/// Updates pending ops after the val changed.
fn update_pending_ops(&self, mut val: SpinLockGuard<i32, PreemptDisabled>, current_pid: Pid) {
debug_assert!(*val >= 0);
trace!("Updating pending ops, semaphore before: {:?}", *val);
// Two steps:
// 1. Remove the pending_alters with `sem_op < 0` if it can.
// 2. If val is equal to 0, then clear pending_const
// Step one:
let mut value = *val;
let mut latest_modified_pid = current_pid;
if value > 0 {
let mut pending_alters = self.pending_alters.lock();
let mut cursor = pending_alters.cursor_front_mut();
while let Some(op) = cursor.current() {
if value == 0 {
break;
}
// Check if the process alive.
if op.process.upgrade().is_none() {
cursor.remove_current().unwrap();
continue;
}
debug_assert!(op.sem_buf.sem_op < 0);
if op.sem_buf.sem_op.abs() as i32 <= value {
trace!(
"Found removable pending op, op: {:?}, pid:{:?}",
op.sem_buf.sem_op,
op.pid
);
value += i32::from(op.sem_buf.sem_op);
latest_modified_pid = op.pid;
op.status.set_status(Status::Normal);
op.waker.wake_up();
cursor.remove_current().unwrap();
} else {
cursor.move_next();
}
}
}
if latest_modified_pid != 0 {
self.latest_modified_pid
.store(latest_modified_pid, Ordering::Relaxed);
self.update_otime();
}
// Step two:
if value == 0 {
let mut pending_const = self.pending_const.lock();
pending_const.iter().for_each(|op| {
op.status.set_status(Status::Normal);
if op.process.upgrade().is_some() {
trace!("Found removable pending op, op: 0, pid:{:?}", op.pid);
op.waker.wake_up();
}
});
pending_const.clear();
}
*val = value;
trace!("Updated pending ops, semaphore after: {:?}", value);
}
}
pub fn sem_op(
sem_id: key_t,
sem_buf: &SemBuf,
sops: Vec<SemBuf>,
timeout: Option<Duration>,
ctx: &Context,
) -> Result<()> {
debug_assert!(sem_id > 0);
debug!("[semop] sembuf: {:?}", sem_buf);
debug!("[semop] sops: {:?}", sops);
let sem = {
let sem_sets = sem_sets();
let sem_set = sem_sets.get(&sem_id).ok_or(Error::new(Errno::EINVAL))?;
// TODO: Support permission check
warn!("Semaphore operation doesn't support permission check now");
sem_set
.get(sem_buf.sem_num as usize)
.ok_or(Error::new(Errno::EFBIG))?
.clone()
let pid = ctx.process.pid();
let mut pending_op = PendingOp {
sops,
status: Arc::new(AtomicStatus::new(Status::Pending)),
waker: None,
pid,
};
sem.sem_op(sem_buf, timeout, ctx)
// TODO: Support permission check
warn!("Semaphore operation doesn't support permission check now");
let (alter, dupsop) = get_sops_flags(&pending_op);
if dupsop {
warn!("Found duplicate sop");
}
let local_sem_sets = sem_sets();
let sem_set = local_sem_sets
.get(&sem_id)
.ok_or(Error::new(Errno::EINVAL))?;
let mut inner = sem_set.inner();
if perform_atomic_semop(&mut inner.sems, &mut pending_op)? {
if alter {
let wake_queue = do_smart_update(&mut inner, &pending_op);
for wake_op in wake_queue {
wake_op.set_status(Status::Normal);
if let Some(waker) = wake_op.waker {
waker.wake_up();
}
}
}
sem_set.update_otime();
return Ok(());
}
// Prepare to wait
let status = pending_op.status.clone();
let (waiter, waker) = Waiter::new_pair();
// Check if timeout exists to avoid calling `Arc::clone()`
if let Some(timeout) = timeout {
pending_op.waker = Some(waker.clone());
let jiffies_timer = JIFFIES_TIMER_MANAGER.get().unwrap().create_timer(move || {
waker.wake_up();
});
jiffies_timer.set_timeout(Timeout::After(timeout));
} else {
pending_op.waker = Some(waker);
}
if alter {
inner.pending_alter.push_back(pending_op);
} else {
inner.pending_const.push_back(pending_op);
}
drop(inner);
drop(local_sem_sets);
waiter.wait();
match status.status() {
Status::Normal => Ok(()),
Status::Removed => Err(Error::new(Errno::EIDRM)),
Status::Pending => {
// FIXME: Getting sem_sets maybe time-consuming.
let sem_sets = sem_sets();
let sem_set = sem_sets.get(&sem_id).ok_or(Error::new(Errno::EINVAL))?;
let mut inner = sem_set.inner();
let pending_ops = if alter {
&mut inner.pending_alter
} else {
&mut inner.pending_const
};
pending_ops.retain(|op| op.pid != pid);
Err(Error::new(Errno::EAGAIN))
}
}
}
/// Update pending const and alter operations, ref: <https://elixir.bootlin.com/linux/v6.0.9/source/ipc/sem.c#L1029>
pub(super) fn do_smart_update(
inner: &mut SpinLockGuard<SemSetInner, PreemptDisabled>,
pending_op: &PendingOp,
) -> LinkedList<PendingOp> {
let mut wake_queue = LinkedList::new();
let (sems, pending_alter, pending_const) = inner.field_mut();
if !pending_const.is_empty() {
do_smart_wakeup_zero(sems, pending_const, pending_op, &mut wake_queue);
}
if !pending_alter.is_empty() {
update_pending_alter(sems, pending_alter, pending_const, &mut wake_queue);
}
wake_queue
}
/// Look for pending alter operations that can be completed, ref: <https://elixir.bootlin.com/linux/v6.0.9/source/ipc/sem.c#L949>
pub(super) fn update_pending_alter(
sems: &mut Box<[Semaphore]>,
pending_alter: &mut LinkedList<PendingOp>,
pending_const: &mut LinkedList<PendingOp>,
wake_queue: &mut LinkedList<PendingOp>,
) {
let mut cursor = pending_alter.cursor_front_mut();
while let Some(alter_op) = cursor.current() {
if let Ok(true) = perform_atomic_semop(sems, alter_op) {
let mut alter_op = cursor.remove_current_as_list().unwrap();
do_smart_wakeup_zero(sems, pending_const, alter_op.front().unwrap(), wake_queue);
wake_queue.append(&mut alter_op);
} else {
cursor.move_next();
}
}
}
/// Wakeup all wait for zero tasks, ref: <https://elixir.bootlin.com/linux/v6.0.9/source/ipc/sem.c#L893>
fn do_smart_wakeup_zero(
sems: &mut Box<[Semaphore]>,
pending_const: &mut LinkedList<PendingOp>,
pending_op: &PendingOp,
wake_queue: &mut LinkedList<PendingOp>,
) {
for sop in pending_op.sops_iter() {
if sems.get(sop.sem_num as usize).unwrap().val == 0 {
wake_const_ops(sems, pending_const, wake_queue);
return;
}
}
}
/// Wakeup pending const operations, ref: <https://elixir.bootlin.com/linux/v6.0.9/source/ipc/sem.c#L854>
pub(super) fn wake_const_ops(
sems: &mut Box<[Semaphore]>,
pending_const: &mut LinkedList<PendingOp>,
wake_queue: &mut LinkedList<PendingOp>,
) {
let mut cursor = pending_const.cursor_front_mut();
while let Some(const_op) = cursor.current() {
if let Ok(true) = perform_atomic_semop(sems, const_op) {
wake_queue.append(&mut cursor.remove_current_as_list().unwrap());
} else {
cursor.move_next();
}
}
}
/// Iter the sops and return the flags (alter, dupsop)
fn get_sops_flags(pending_op: &PendingOp) -> (bool, bool) {
let mut alter = false;
let mut dupsop = false;
let mut dup = 0;
for sop in pending_op.sops_iter() {
let mask: u64 = 1 << ((sop.sem_num) % 64);
if (dup & mask) != 0 {
dupsop = true;
}
if sop.sem_op != 0 {
alter = true;
dup |= mask;
}
}
(alter, dupsop)
}
/// Perform atomic semop, ref: <https://elixir.bootlin.com/linux/v6.0.9/source/ipc/sem.c#L719>
/// 1. Return Ok(true) if the operation success.
/// 2. Return Ok(false) if the caller needs to wait.
/// 3. Return Err(err) if the operation cause error.
fn perform_atomic_semop(sems: &mut Box<[Semaphore]>, pending_op: &mut PendingOp) -> Result<bool> {
let mut result;
for op in pending_op.sops_iter() {
let sem = sems.get(op.sem_num as usize).ok_or(Errno::EFBIG)?;
let flags = IpcFlags::from_bits_truncate(op.sem_flags as u32);
result = sem.val();
// Zero condition
if op.sem_op == 0 && result != 0 {
if flags.contains(IpcFlags::IPC_NOWAIT) {
return_errno!(Errno::EAGAIN);
} else {
return Ok(false);
}
}
result += i32::from(op.sem_op);
if result < 0 {
if flags.contains(IpcFlags::IPC_NOWAIT) {
return_errno!(Errno::EAGAIN);
} else {
return Ok(false);
}
}
if result > SEMVMX {
return_errno!(Errno::ERANGE);
}
if flags.contains(IpcFlags::SEM_UNDO) {
todo!()
}
}
// Success, do operation
for op in pending_op.sops_iter() {
let sem = &mut sems[op.sem_num as usize];
if op.sem_op != 0 {
sem.val += i32::from(op.sem_op);
sem.latest_modified_pid = pending_op.pid;
}
}
Ok(true)
}