Fix deadlock of CurrentTx in mlsdisk

This commit is contained in:
Qingsong Chen
2025-01-10 02:44:40 +00:00
committed by Tate, Hongliang Tian
parent c040df72b3
commit d8dc153be9
2 changed files with 32 additions and 13 deletions

View File

@ -46,33 +46,52 @@ impl<'a> CurrentTx<'a> {
/// If the returned value is `Ok`, then the TX is committed successfully. /// If the returned value is `Ok`, then the TX is committed successfully.
/// Otherwise, the TX is aborted. /// Otherwise, the TX is aborted.
pub fn commit(&self) -> Result<()> { pub fn commit(&self) -> Result<()> {
let mut tx_table = self.provider.tx_table.lock(); let mut tx_status = self
let Some(mut tx) = tx_table.remove(&CurrentThread::id()) else { .provider
panic!("there should be one Tx exited on the current thread"); .tx_table
}; .lock()
debug_assert!(tx.status() == TxStatus::Ongoing); .get(&CurrentThread::id())
.expect("there should be one Tx exited on the current thread")
.status();
debug_assert!(tx_status == TxStatus::Ongoing);
let res = self.provider.call_precommit_handlers(); let res = self.provider.call_precommit_handlers();
if res.is_ok() { if res.is_ok() {
self.provider.call_commit_handlers(); self.provider.call_commit_handlers();
tx.set_status(TxStatus::Committed); tx_status = TxStatus::Committed;
} else { } else {
self.provider.call_abort_handlers(); self.provider.call_abort_handlers();
tx.set_status(TxStatus::Aborted); tx_status = TxStatus::Aborted;
} }
let mut tx = self
.provider
.tx_table
.lock()
.remove(&CurrentThread::id())
.unwrap();
tx.set_status(tx_status);
res res
} }
/// Aborts the current TX. /// Aborts the current TX.
pub fn abort(&self) { pub fn abort(&self) {
let mut tx_table = self.provider.tx_table.lock(); let tx_status = self
let Some(mut tx) = tx_table.remove(&CurrentThread::id()) else { .provider
panic!("there should be one Tx exited on the current thread"); .tx_table
}; .lock()
debug_assert!(tx.status() == TxStatus::Ongoing); .get(&CurrentThread::id())
.expect("there should be one Tx exited on the current thread")
.status();
debug_assert!(tx_status == TxStatus::Ongoing);
self.provider.call_abort_handlers(); self.provider.call_abort_handlers();
let mut tx = self
.provider
.tx_table
.lock()
.remove(&CurrentThread::id())
.unwrap();
tx.set_status(TxStatus::Aborted); tx.set_status(TxStatus::Aborted);
} }

View File

@ -90,7 +90,7 @@ impl TxProvider {
let init_fn = initializer_map let init_fn = initializer_map
.get(&TypeId::of::<T>()) .get(&TypeId::of::<T>())
.unwrap() .unwrap()
.downcast_ref::<Box<dyn Fn() -> T>>() .downcast_ref::<Box<dyn Fn() -> T + Send + Sync>>()
.unwrap(); .unwrap();
init_fn() init_fn()
} }