Add readahead for pagecache

This commit is contained in:
Shaocong Sun 2024-05-15 21:13:57 +08:00 committed by Tate, Hongliang Tian
parent 053f8d416e
commit 1b22267a87
2 changed files with 245 additions and 18 deletions

View File

@ -1,5 +1,7 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
#![allow(dead_code)]
use core::ops::Range; use core::ops::Range;
use aster_block::bio::{BioStatus, BioWaiter}; use aster_block::bio::{BioStatus, BioWaiter};
@ -85,9 +87,196 @@ impl Debug for PageCache {
} }
} }
struct ReadaheadWindow {
/// The window.
window: Range<usize>,
/// Look ahead position in the current window, where the readahead is triggered.
/// TODO: We set the `lookahead_index` to the start of the window for now.
/// This should be adjustable by the user.
lookahead_index: usize,
}
impl ReadaheadWindow {
pub fn new(window: Range<usize>) -> Self {
let lookahead_index = window.start;
Self {
window,
lookahead_index,
}
}
/// Gets the next readahead window.
/// Most of the time, we push the window forward and double its size.
///
/// The `max_size` is the maximum size of the window.
/// The `max_page` is the total page number of the file, and the window should not
/// exceed the scope of the file.
pub fn next(&self, max_size: usize, max_page: usize) -> Self {
let new_start = self.window.end;
let cur_size = self.window.end - self.window.start;
let new_size = (cur_size * 2).min(max_size).min(max_page - new_start);
Self {
window: new_start..(new_start + new_size),
lookahead_index: new_start,
}
}
pub fn lookahead_index(&self) -> usize {
self.lookahead_index
}
pub fn readahead_index(&self) -> usize {
self.window.end
}
pub fn readahead_range(&self) -> Range<usize> {
self.window.clone()
}
}
struct ReadaheadState {
/// Current readahead window.
ra_window: Option<ReadaheadWindow>,
/// Maximum window size.
max_size: usize,
/// The last page visited, used to determine sequential I/O.
prev_page: Option<usize>,
/// Readahead requests waiter.
waiter: BioWaiter,
}
impl ReadaheadState {
const INIT_WINDOW_SIZE: usize = 4;
const DEFAULT_MAX_SIZE: usize = 32;
pub fn new() -> Self {
Self {
ra_window: None,
max_size: Self::DEFAULT_MAX_SIZE,
prev_page: None,
waiter: BioWaiter::new(),
}
}
/// Sets the maximum readahead window size.
pub fn set_max_window_size(&mut self, size: usize) {
self.max_size = size;
}
fn is_sequential(&self, idx: usize) -> bool {
if let Some(prev) = self.prev_page {
idx == prev || idx == prev + 1
} else {
false
}
}
/// The number of bio requests in waiter.
/// This number will be zero if there are no previous readahead.
pub fn request_number(&self) -> usize {
self.waiter.nreqs()
}
/// Checks for the previous readahead.
/// Returns true if the previous readahead has been completed.
pub fn prev_readahead_is_completed(&self) -> bool {
let nreqs = self.request_number();
if nreqs == 0 {
return false;
}
for i in 0..nreqs {
if self.waiter.status(i) == BioStatus::Submit {
return false;
}
}
true
}
/// Waits for the previous readahead.
pub fn wait_for_prev_readahead(
&mut self,
pages: &mut MutexGuard<LruCache<usize, Page>>,
) -> Result<()> {
if matches!(self.waiter.wait(), Some(BioStatus::Complete)) {
let Some(window) = &self.ra_window else {
return_errno!(Errno::EINVAL)
};
for idx in window.readahead_range() {
if let Some(page) = pages.get_mut(&idx) {
page.set_state(PageState::UpToDate);
}
}
self.waiter.clear();
} else {
return_errno!(Errno::EIO)
}
Ok(())
}
/// Determines whether a new readahead should be performed.
/// We only consider readahead for sequential I/O now.
/// There should be at most one in-progress readahead.
pub fn should_readahead(&self, idx: usize, max_page: usize) -> bool {
if self.request_number() == 0 && self.is_sequential(idx) {
if let Some(cur_window) = &self.ra_window {
let trigger_readahead =
idx == cur_window.lookahead_index() || idx == cur_window.readahead_index();
let next_window_exist = cur_window.readahead_range().end < max_page;
trigger_readahead && next_window_exist
} else {
let new_window_start = idx + 1;
new_window_start < max_page
}
} else {
false
}
}
/// Setup the new readahead window.
pub fn setup_window(&mut self, idx: usize, max_page: usize) {
let new_window = if let Some(cur_window) = &self.ra_window {
cur_window.next(self.max_size, max_page)
} else {
let start_idx = idx + 1;
let init_size = Self::INIT_WINDOW_SIZE.min(self.max_size);
let end_idx = (start_idx + init_size).min(max_page);
ReadaheadWindow::new(start_idx..end_idx)
};
self.ra_window = Some(new_window);
}
/// Conducts the new readahead.
/// Sends the relevant read request and sets the relevant page in the page cache to `Uninit`.
pub fn conduct_readahead(
&mut self,
pages: &mut MutexGuard<LruCache<usize, Page>>,
backend: Arc<dyn PageCacheBackend>,
) -> Result<()> {
let Some(window) = &self.ra_window else {
return_errno!(Errno::EINVAL)
};
for async_idx in window.readahead_range() {
let mut async_page = Page::alloc()?;
let pg_waiter = backend.read_page(async_idx, async_page.frame())?;
self.waiter.concat(pg_waiter);
async_page.set_state(PageState::Uninit);
pages.put(async_idx, async_page);
}
Ok(())
}
/// Sets the last page visited.
pub fn set_prev_page(&mut self, idx: usize) {
self.prev_page = Some(idx);
}
}
struct PageCacheManager { struct PageCacheManager {
pages: Mutex<LruCache<usize, Page>>, pages: Mutex<LruCache<usize, Page>>,
backend: Weak<dyn PageCacheBackend>, backend: Weak<dyn PageCacheBackend>,
ra_state: Mutex<ReadaheadState>,
} }
impl PageCacheManager { impl PageCacheManager {
@ -95,6 +284,7 @@ impl PageCacheManager {
Self { Self {
pages: Mutex::new(LruCache::unbounded()), pages: Mutex::new(LruCache::unbounded()),
backend, backend,
ra_state: Mutex::new(ReadaheadState::new()),
} }
} }
@ -140,6 +330,55 @@ impl PageCacheManager {
Ok(()) Ok(())
} }
fn ondemand_readahead(&self, idx: usize) -> Result<Frame> {
let mut pages = self.pages.lock();
let mut ra_state = self.ra_state.lock();
let backend = self.backend();
// Checks for the previous readahead.
if ra_state.prev_readahead_is_completed() {
ra_state.wait_for_prev_readahead(&mut pages)?;
}
// There are three possible conditions that could be encountered upon reaching here.
// 1. The requested page is ready for read in page cache.
// 2. The requested page is in previous readahead range, not ready for now.
// 3. The requested page is on disk, need a sync read operation here.
let frame = if let Some(page) = pages.get(&idx) {
// Cond 1 & 2.
if let PageState::Uninit = page.state() {
// Cond 2: We should wait for the previous readahead.
// If there is no previous readahead, an error must have occurred somewhere.
if ra_state.request_number() == 0 {
return_errno!(Errno::EINVAL)
}
ra_state.wait_for_prev_readahead(&mut pages)?;
pages.get(&idx).unwrap().frame().clone()
} else {
// Cond 1.
page.frame().clone()
}
} else {
// Cond 3.
// Conducts the sync read operation.
let page = if idx < backend.npages() {
let mut page = Page::alloc()?;
backend.read_page_sync(idx, page.frame())?;
page.set_state(PageState::UpToDate);
page
} else {
Page::alloc_zero()?
};
let frame = page.frame().clone();
pages.put(idx, page);
frame
};
if ra_state.should_readahead(idx, backend.npages()) {
ra_state.setup_window(idx, backend.npages());
ra_state.conduct_readahead(&mut pages, backend)?;
}
ra_state.set_prev_page(idx);
Ok(frame)
}
} }
impl Debug for PageCacheManager { impl Debug for PageCacheManager {
@ -152,24 +391,7 @@ impl Debug for PageCacheManager {
impl Pager for PageCacheManager { impl Pager for PageCacheManager {
fn commit_page(&self, idx: usize) -> Result<Frame> { fn commit_page(&self, idx: usize) -> Result<Frame> {
if let Some(page) = self.pages.lock().get(&idx) { self.ondemand_readahead(idx)
return Ok(page.frame.clone());
}
// Multiple threads may commit the same page, but the result is ok.
let backend = self.backend();
let page = if idx < backend.npages() {
let mut page = Page::alloc()?;
backend.read_page_sync(idx, page.frame())?;
page.set_state(PageState::UpToDate);
page
} else {
Page::alloc_zero()?
};
// If multiple threads commit the same page, only the first allocated page will be perserved.
Ok(self.pages.lock().get_or_insert(idx, || page).frame.clone())
} }
fn update_page(&self, idx: usize) -> Result<()> { fn update_page(&self, idx: usize) -> Result<()> {

View File

@ -220,6 +220,11 @@ impl BioWaiter {
ret ret
} }
/// Clears all `Bio` requests in this waiter.
pub fn clear(&mut self) {
self.bios.clear();
}
} }
impl Default for BioWaiter { impl Default for BioWaiter {