Reduce the time to import a dump by caching some datas

With this commit, for a dump containing 1M tasks we went form 1m02 to 6s
This commit is contained in:
Tamo 2023-03-29 14:44:15 +02:00
parent cf5145b542
commit 3fb67f94f7

View File

@ -31,6 +31,7 @@ mod uuid_codec;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32; pub type TaskId = u32;
use std::collections::HashMap;
use std::ops::{Bound, RangeBounds}; use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -1114,6 +1115,10 @@ impl IndexScheduler {
pub struct Dump<'a> { pub struct Dump<'a> {
index_scheduler: &'a IndexScheduler, index_scheduler: &'a IndexScheduler,
wtxn: RwTxn<'a, 'a>, wtxn: RwTxn<'a, 'a>,
indexes: HashMap<String, RoaringBitmap>,
statuses: HashMap<Status, RoaringBitmap>,
kinds: HashMap<Kind, RoaringBitmap>,
} }
impl<'a> Dump<'a> { impl<'a> Dump<'a> {
@ -1121,7 +1126,13 @@ impl<'a> Dump<'a> {
// While loading a dump no one should be able to access the scheduler thus I can block everything. // While loading a dump no one should be able to access the scheduler thus I can block everything.
let wtxn = index_scheduler.env.write_txn()?; let wtxn = index_scheduler.env.write_txn()?;
Ok(Dump { index_scheduler, wtxn }) Ok(Dump {
index_scheduler,
wtxn,
indexes: HashMap::new(),
statuses: HashMap::new(),
kinds: HashMap::new(),
})
} }
/// Register a new task coming from a dump in the scheduler. /// Register a new task coming from a dump in the scheduler.
@ -1215,26 +1226,38 @@ impl<'a> Dump<'a> {
self.index_scheduler.all_tasks.put(&mut self.wtxn, &BEU32::new(task.uid), &task)?; self.index_scheduler.all_tasks.put(&mut self.wtxn, &BEU32::new(task.uid), &task)?;
for index in task.indexes() { for index in task.indexes() {
self.index_scheduler.update_index(&mut self.wtxn, index, |bitmap| { match self.indexes.get_mut(index) {
bitmap.insert(task.uid); Some(bitmap) => {
})?; bitmap.insert(task.uid);
}
None => {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(task.uid);
self.indexes.insert(index.to_string(), bitmap);
}
};
} }
self.statuses.entry(task.status).or_insert(RoaringBitmap::new()).insert(task.uid);
self.index_scheduler.update_status(&mut self.wtxn, task.status, |bitmap| { self.kinds.entry(task.kind.as_kind()).or_insert(RoaringBitmap::new()).insert(task.uid);
bitmap.insert(task.uid);
})?;
self.index_scheduler.update_kind(&mut self.wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})?;
Ok(task) Ok(task)
} }
/// Commit all the changes and exit the importing dump state /// Commit all the changes and exit the importing dump state
pub fn finish(self) -> Result<()> { pub fn finish(mut self) -> Result<()> {
for (index, bitmap) in self.indexes {
self.index_scheduler.index_tasks.put(&mut self.wtxn, &index, &bitmap)?;
}
for (status, bitmap) in self.statuses {
self.index_scheduler.put_status(&mut self.wtxn, status, &bitmap)?;
}
for (kind, bitmap) in self.kinds {
self.index_scheduler.put_kind(&mut self.wtxn, kind, &bitmap)?;
}
self.wtxn.commit()?; self.wtxn.commit()?;
self.index_scheduler.wake_up.signal(); self.index_scheduler.wake_up.signal();
Ok(()) Ok(())
} }
} }