mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-19 01:18:31 +08:00
Reduce the time to import a dump
With this commit, for a dump containing 1M tasks we went from 3m36s to import the task queue down to 1m02s
This commit is contained in:
parent
31bb61ba99
commit
cf5145b542
@ -43,7 +43,7 @@ pub use error::Error;
|
||||
use file_store::FileStore;
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{self, Database, Env, RoTxn};
|
||||
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli;
|
||||
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
@ -883,115 +883,8 @@ impl IndexScheduler {
|
||||
|
||||
/// Register a new task coming from a dump in the scheduler.
|
||||
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
|
||||
pub fn register_dumped_task(
|
||||
&mut self,
|
||||
task: TaskDump,
|
||||
content_file: Option<Box<UpdateFile>>,
|
||||
) -> Result<Task> {
|
||||
// Currently we don't need to access the tasks queue while loading a dump thus I can block everything.
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
|
||||
let content_uuid = match content_file {
|
||||
Some(content_file) if task.status == Status::Enqueued => {
|
||||
let (uuid, mut file) = self.create_update_file()?;
|
||||
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
|
||||
for doc in content_file {
|
||||
builder.append_json_object(&doc?)?;
|
||||
}
|
||||
builder.into_inner()?;
|
||||
file.persist()?;
|
||||
|
||||
Some(uuid)
|
||||
}
|
||||
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
|
||||
// in case we try to open it later.
|
||||
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let task = Task {
|
||||
uid: task.uid,
|
||||
enqueued_at: task.enqueued_at,
|
||||
started_at: task.started_at,
|
||||
finished_at: task.finished_at,
|
||||
error: task.error,
|
||||
canceled_by: task.canceled_by,
|
||||
details: task.details,
|
||||
status: task.status,
|
||||
kind: match task.kind {
|
||||
KindDump::DocumentImport {
|
||||
primary_key,
|
||||
method,
|
||||
documents_count,
|
||||
allow_index_creation,
|
||||
} => KindWithContent::DocumentAdditionOrUpdate {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
primary_key,
|
||||
method,
|
||||
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
|
||||
documents_count,
|
||||
allow_index_creation,
|
||||
},
|
||||
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
|
||||
documents_ids,
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
},
|
||||
KindDump::DocumentClear => KindWithContent::DocumentClear {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
},
|
||||
KindDump::Settings { settings, is_deletion, allow_index_creation } => {
|
||||
KindWithContent::SettingsUpdate {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
new_settings: settings,
|
||||
is_deletion,
|
||||
allow_index_creation,
|
||||
}
|
||||
}
|
||||
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
},
|
||||
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
primary_key,
|
||||
},
|
||||
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
primary_key,
|
||||
},
|
||||
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
|
||||
KindDump::TaskCancelation { query, tasks } => {
|
||||
KindWithContent::TaskCancelation { query, tasks }
|
||||
}
|
||||
KindDump::TasksDeletion { query, tasks } => {
|
||||
KindWithContent::TaskDeletion { query, tasks }
|
||||
}
|
||||
KindDump::DumpCreation { keys, instance_uid } => {
|
||||
KindWithContent::DumpCreation { keys, instance_uid }
|
||||
}
|
||||
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
|
||||
},
|
||||
};
|
||||
|
||||
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task)?;
|
||||
|
||||
for index in task.indexes() {
|
||||
self.update_index(&mut wtxn, index, |bitmap| {
|
||||
bitmap.insert(task.uid);
|
||||
})?;
|
||||
}
|
||||
|
||||
self.update_status(&mut wtxn, task.status, |bitmap| {
|
||||
bitmap.insert(task.uid);
|
||||
})?;
|
||||
|
||||
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
|
||||
(bitmap.insert(task.uid));
|
||||
})?;
|
||||
|
||||
wtxn.commit()?;
|
||||
self.wake_up.signal();
|
||||
|
||||
Ok(task)
|
||||
pub fn register_dumped_task(&mut self) -> Result<Dump> {
|
||||
Dump::new(self)
|
||||
}
|
||||
|
||||
/// Create a new index without any associated task.
|
||||
@ -1218,6 +1111,134 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Dump<'a> {
|
||||
index_scheduler: &'a IndexScheduler,
|
||||
wtxn: RwTxn<'a, 'a>,
|
||||
}
|
||||
|
||||
impl<'a> Dump<'a> {
|
||||
pub(crate) fn new(index_scheduler: &'a mut IndexScheduler) -> Result<Self> {
|
||||
// While loading a dump no one should be able to access the scheduler thus I can block everything.
|
||||
let wtxn = index_scheduler.env.write_txn()?;
|
||||
|
||||
Ok(Dump { index_scheduler, wtxn })
|
||||
}
|
||||
|
||||
/// Register a new task coming from a dump in the scheduler.
|
||||
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
|
||||
pub fn register_dumped_task(
|
||||
&mut self,
|
||||
task: TaskDump,
|
||||
content_file: Option<Box<UpdateFile>>,
|
||||
) -> Result<Task> {
|
||||
let content_uuid = match content_file {
|
||||
Some(content_file) if task.status == Status::Enqueued => {
|
||||
let (uuid, mut file) = self.index_scheduler.create_update_file()?;
|
||||
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
|
||||
for doc in content_file {
|
||||
builder.append_json_object(&doc?)?;
|
||||
}
|
||||
builder.into_inner()?;
|
||||
file.persist()?;
|
||||
|
||||
Some(uuid)
|
||||
}
|
||||
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
|
||||
// in case we try to open it later.
|
||||
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let task = Task {
|
||||
uid: task.uid,
|
||||
enqueued_at: task.enqueued_at,
|
||||
started_at: task.started_at,
|
||||
finished_at: task.finished_at,
|
||||
error: task.error,
|
||||
canceled_by: task.canceled_by,
|
||||
details: task.details,
|
||||
status: task.status,
|
||||
kind: match task.kind {
|
||||
KindDump::DocumentImport {
|
||||
primary_key,
|
||||
method,
|
||||
documents_count,
|
||||
allow_index_creation,
|
||||
} => KindWithContent::DocumentAdditionOrUpdate {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
primary_key,
|
||||
method,
|
||||
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
|
||||
documents_count,
|
||||
allow_index_creation,
|
||||
},
|
||||
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
|
||||
documents_ids,
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
},
|
||||
KindDump::DocumentClear => KindWithContent::DocumentClear {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
},
|
||||
KindDump::Settings { settings, is_deletion, allow_index_creation } => {
|
||||
KindWithContent::SettingsUpdate {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
new_settings: settings,
|
||||
is_deletion,
|
||||
allow_index_creation,
|
||||
}
|
||||
}
|
||||
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
},
|
||||
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
primary_key,
|
||||
},
|
||||
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
|
||||
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||||
primary_key,
|
||||
},
|
||||
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
|
||||
KindDump::TaskCancelation { query, tasks } => {
|
||||
KindWithContent::TaskCancelation { query, tasks }
|
||||
}
|
||||
KindDump::TasksDeletion { query, tasks } => {
|
||||
KindWithContent::TaskDeletion { query, tasks }
|
||||
}
|
||||
KindDump::DumpCreation { keys, instance_uid } => {
|
||||
KindWithContent::DumpCreation { keys, instance_uid }
|
||||
}
|
||||
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
|
||||
},
|
||||
};
|
||||
|
||||
self.index_scheduler.all_tasks.put(&mut self.wtxn, &BEU32::new(task.uid), &task)?;
|
||||
|
||||
for index in task.indexes() {
|
||||
self.index_scheduler.update_index(&mut self.wtxn, index, |bitmap| {
|
||||
bitmap.insert(task.uid);
|
||||
})?;
|
||||
}
|
||||
|
||||
self.index_scheduler.update_status(&mut self.wtxn, task.status, |bitmap| {
|
||||
bitmap.insert(task.uid);
|
||||
})?;
|
||||
|
||||
self.index_scheduler.update_kind(&mut self.wtxn, task.kind.as_kind(), |bitmap| {
|
||||
(bitmap.insert(task.uid));
|
||||
})?;
|
||||
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
/// Commit all the changes and exit the importing dump state
|
||||
pub fn finish(self) -> Result<()> {
|
||||
self.wtxn.commit()?;
|
||||
self.index_scheduler.wake_up.signal();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// The outcome of calling the [`IndexScheduler::tick`] function.
|
||||
pub enum TickOutcome {
|
||||
/// The scheduler should immediately attempt another `tick`.
|
||||
|
@ -367,12 +367,14 @@ fn import_dump(
|
||||
log::info!("All documents successfully imported.");
|
||||
}
|
||||
|
||||
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
|
||||
|
||||
// 4. Import the tasks.
|
||||
for ret in dump_reader.tasks()? {
|
||||
let (task, file) = ret?;
|
||||
index_scheduler.register_dumped_task(task, file)?;
|
||||
index_scheduler_dump.register_dumped_task(task, file)?;
|
||||
}
|
||||
Ok(())
|
||||
Ok(index_scheduler_dump.finish()?)
|
||||
}
|
||||
|
||||
pub fn configure_data(
|
||||
|
Loading…
Reference in New Issue
Block a user