3624: Reduce the time to import a dump r=irevoire a=irevoire

When importing a dump, this PR does multiple things;
- Stops committing the changes between each task import
- Stop deserializing + serializing every bitmap for every task

Pros:
Importing 1M tasks in a dump went from 3m36 on my computer to 6s

Cons: We use slightly more memory, but since we’re using roaring bitmaps, that really shouldn’t be noticeable.

Fixes #3620

Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
bors[bot] 2023-03-29 13:40:25 +00:00 committed by GitHub
commit 7871d12025
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 158 additions and 112 deletions

View File

@ -31,6 +31,7 @@ mod uuid_codec;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32; pub type TaskId = u32;
use std::collections::HashMap;
use std::ops::{Bound, RangeBounds}; use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -43,7 +44,7 @@ pub use error::Error;
use file_store::FileStore; use file_store::FileStore;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env, RoTxn}; use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
use meilisearch_types::milli; use meilisearch_types::milli;
use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::update::IndexerConfig;
@ -883,115 +884,8 @@ impl IndexScheduler {
/// Register a new task coming from a dump in the scheduler. /// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_task( pub fn register_dumped_task(&mut self) -> Result<Dump> {
&mut self, Dump::new(self)
task: TaskDump,
content_file: Option<Box<UpdateFile>>,
) -> Result<Task> {
// Currently we don't need to access the tasks queue while loading a dump thus I can block everything.
let mut wtxn = self.env.write_txn()?;
let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.create_update_file()?;
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
for doc in content_file {
builder.append_json_object(&doc?)?;
}
builder.into_inner()?;
file.persist()?;
Some(uuid)
}
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
// in case we try to open it later.
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
_ => None,
};
let task = Task {
uid: task.uid,
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
error: task.error,
canceled_by: task.canceled_by,
details: task.details,
status: task.status,
kind: match task.kind {
KindDump::DocumentImport {
primary_key,
method,
documents_count,
allow_index_creation,
} => KindWithContent::DocumentAdditionOrUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
method,
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
documents_count,
allow_index_creation,
},
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
documents_ids,
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::DocumentClear => KindWithContent::DocumentClear {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::Settings { settings, is_deletion, allow_index_creation } => {
KindWithContent::SettingsUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
new_settings: settings,
is_deletion,
allow_index_creation,
}
}
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
KindDump::TaskCancelation { query, tasks } => {
KindWithContent::TaskCancelation { query, tasks }
}
KindDump::TasksDeletion { query, tasks } => {
KindWithContent::TaskDeletion { query, tasks }
}
KindDump::DumpCreation { keys, instance_uid } => {
KindWithContent::DumpCreation { keys, instance_uid }
}
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
},
};
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task)?;
for index in task.indexes() {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
self.update_status(&mut wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})?;
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})?;
wtxn.commit()?;
self.wake_up.signal();
Ok(task)
} }
/// Create a new index without any associated task. /// Create a new index without any associated task.
@ -1218,6 +1112,156 @@ impl IndexScheduler {
} }
} }
pub struct Dump<'a> {
index_scheduler: &'a IndexScheduler,
wtxn: RwTxn<'a, 'a>,
indexes: HashMap<String, RoaringBitmap>,
statuses: HashMap<Status, RoaringBitmap>,
kinds: HashMap<Kind, RoaringBitmap>,
}
impl<'a> Dump<'a> {
pub(crate) fn new(index_scheduler: &'a mut IndexScheduler) -> Result<Self> {
// While loading a dump no one should be able to access the scheduler thus I can block everything.
let wtxn = index_scheduler.env.write_txn()?;
Ok(Dump {
index_scheduler,
wtxn,
indexes: HashMap::new(),
statuses: HashMap::new(),
kinds: HashMap::new(),
})
}
/// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_task(
&mut self,
task: TaskDump,
content_file: Option<Box<UpdateFile>>,
) -> Result<Task> {
let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.index_scheduler.create_update_file()?;
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
for doc in content_file {
builder.append_json_object(&doc?)?;
}
builder.into_inner()?;
file.persist()?;
Some(uuid)
}
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
// in case we try to open it later.
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
_ => None,
};
let task = Task {
uid: task.uid,
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
error: task.error,
canceled_by: task.canceled_by,
details: task.details,
status: task.status,
kind: match task.kind {
KindDump::DocumentImport {
primary_key,
method,
documents_count,
allow_index_creation,
} => KindWithContent::DocumentAdditionOrUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
method,
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
documents_count,
allow_index_creation,
},
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
documents_ids,
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::DocumentClear => KindWithContent::DocumentClear {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::Settings { settings, is_deletion, allow_index_creation } => {
KindWithContent::SettingsUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
new_settings: settings,
is_deletion,
allow_index_creation,
}
}
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
KindDump::TaskCancelation { query, tasks } => {
KindWithContent::TaskCancelation { query, tasks }
}
KindDump::TasksDeletion { query, tasks } => {
KindWithContent::TaskDeletion { query, tasks }
}
KindDump::DumpCreation { keys, instance_uid } => {
KindWithContent::DumpCreation { keys, instance_uid }
}
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
},
};
self.index_scheduler.all_tasks.put(&mut self.wtxn, &BEU32::new(task.uid), &task)?;
for index in task.indexes() {
match self.indexes.get_mut(index) {
Some(bitmap) => {
bitmap.insert(task.uid);
}
None => {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(task.uid);
self.indexes.insert(index.to_string(), bitmap);
}
};
}
self.statuses.entry(task.status).or_insert(RoaringBitmap::new()).insert(task.uid);
self.kinds.entry(task.kind.as_kind()).or_insert(RoaringBitmap::new()).insert(task.uid);
Ok(task)
}
/// Commit all the changes and exit the importing dump state
pub fn finish(mut self) -> Result<()> {
for (index, bitmap) in self.indexes {
self.index_scheduler.index_tasks.put(&mut self.wtxn, &index, &bitmap)?;
}
for (status, bitmap) in self.statuses {
self.index_scheduler.put_status(&mut self.wtxn, status, &bitmap)?;
}
for (kind, bitmap) in self.kinds {
self.index_scheduler.put_kind(&mut self.wtxn, kind, &bitmap)?;
}
self.wtxn.commit()?;
self.index_scheduler.wake_up.signal();
Ok(())
}
}
/// The outcome of calling the [`IndexScheduler::tick`] function. /// The outcome of calling the [`IndexScheduler::tick`] function.
pub enum TickOutcome { pub enum TickOutcome {
/// The scheduler should immediately attempt another `tick`. /// The scheduler should immediately attempt another `tick`.

View File

@ -367,12 +367,14 @@ fn import_dump(
log::info!("All documents successfully imported."); log::info!("All documents successfully imported.");
} }
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
// 4. Import the tasks. // 4. Import the tasks.
for ret in dump_reader.tasks()? { for ret in dump_reader.tasks()? {
let (task, file) = ret?; let (task, file) = ret?;
index_scheduler.register_dumped_task(task, file)?; index_scheduler_dump.register_dumped_task(task, file)?;
} }
Ok(()) Ok(index_scheduler_dump.finish()?)
} }
pub fn configure_data( pub fn configure_data(