diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index e23e4ff8b..3e7c85148 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -43,7 +43,7 @@ pub use error::Error; use file_store::FileStore; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; -use meilisearch_types::heed::{self, Database, Env, RoTxn}; +use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn}; use meilisearch_types::milli; use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::update::IndexerConfig; @@ -883,115 +883,8 @@ impl IndexScheduler { /// Register a new task coming from a dump in the scheduler. /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. - pub fn register_dumped_task( - &mut self, - task: TaskDump, - content_file: Option>, - ) -> Result { - // Currently we don't need to access the tasks queue while loading a dump thus I can block everything. - let mut wtxn = self.env.write_txn()?; - - let content_uuid = match content_file { - Some(content_file) if task.status == Status::Enqueued => { - let (uuid, mut file) = self.create_update_file()?; - let mut builder = DocumentsBatchBuilder::new(file.as_file_mut()); - for doc in content_file { - builder.append_json_object(&doc?)?; - } - builder.into_inner()?; - file.persist()?; - - Some(uuid) - } - // If the task isn't `Enqueued` then just generate a recognisable `Uuid` - // in case we try to open it later. - _ if task.status != Status::Enqueued => Some(Uuid::nil()), - _ => None, - }; - - let task = Task { - uid: task.uid, - enqueued_at: task.enqueued_at, - started_at: task.started_at, - finished_at: task.finished_at, - error: task.error, - canceled_by: task.canceled_by, - details: task.details, - status: task.status, - kind: match task.kind { - KindDump::DocumentImport { - primary_key, - method, - documents_count, - allow_index_creation, - } => KindWithContent::DocumentAdditionOrUpdate { - index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, - primary_key, - method, - content_file: content_uuid.ok_or(Error::CorruptedDump)?, - documents_count, - allow_index_creation, - }, - KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion { - documents_ids, - index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, - }, - KindDump::DocumentClear => KindWithContent::DocumentClear { - index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, - }, - KindDump::Settings { settings, is_deletion, allow_index_creation } => { - KindWithContent::SettingsUpdate { - index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, - new_settings: settings, - is_deletion, - allow_index_creation, - } - } - KindDump::IndexDeletion => KindWithContent::IndexDeletion { - index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, - }, - KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation { - index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, - primary_key, - }, - KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate { - index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, - primary_key, - }, - KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps }, - KindDump::TaskCancelation { query, tasks } => { - KindWithContent::TaskCancelation { query, tasks } - } - KindDump::TasksDeletion { query, tasks } => { - KindWithContent::TaskDeletion { query, tasks } - } - KindDump::DumpCreation { keys, instance_uid } => { - KindWithContent::DumpCreation { keys, instance_uid } - } - KindDump::SnapshotCreation => KindWithContent::SnapshotCreation, - }, - }; - - self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task)?; - - for index in task.indexes() { - self.update_index(&mut wtxn, index, |bitmap| { - bitmap.insert(task.uid); - })?; - } - - self.update_status(&mut wtxn, task.status, |bitmap| { - bitmap.insert(task.uid); - })?; - - self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| { - (bitmap.insert(task.uid)); - })?; - - wtxn.commit()?; - self.wake_up.signal(); - - Ok(task) + pub fn register_dumped_task(&mut self) -> Result { + Dump::new(self) } /// Create a new index without any associated task. @@ -1218,6 +1111,134 @@ impl IndexScheduler { } } +pub struct Dump<'a> { + index_scheduler: &'a IndexScheduler, + wtxn: RwTxn<'a, 'a>, +} + +impl<'a> Dump<'a> { + pub(crate) fn new(index_scheduler: &'a mut IndexScheduler) -> Result { + // While loading a dump no one should be able to access the scheduler thus I can block everything. + let wtxn = index_scheduler.env.write_txn()?; + + Ok(Dump { index_scheduler, wtxn }) + } + + /// Register a new task coming from a dump in the scheduler. + /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. + pub fn register_dumped_task( + &mut self, + task: TaskDump, + content_file: Option>, + ) -> Result { + let content_uuid = match content_file { + Some(content_file) if task.status == Status::Enqueued => { + let (uuid, mut file) = self.index_scheduler.create_update_file()?; + let mut builder = DocumentsBatchBuilder::new(file.as_file_mut()); + for doc in content_file { + builder.append_json_object(&doc?)?; + } + builder.into_inner()?; + file.persist()?; + + Some(uuid) + } + // If the task isn't `Enqueued` then just generate a recognisable `Uuid` + // in case we try to open it later. + _ if task.status != Status::Enqueued => Some(Uuid::nil()), + _ => None, + }; + + let task = Task { + uid: task.uid, + enqueued_at: task.enqueued_at, + started_at: task.started_at, + finished_at: task.finished_at, + error: task.error, + canceled_by: task.canceled_by, + details: task.details, + status: task.status, + kind: match task.kind { + KindDump::DocumentImport { + primary_key, + method, + documents_count, + allow_index_creation, + } => KindWithContent::DocumentAdditionOrUpdate { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + primary_key, + method, + content_file: content_uuid.ok_or(Error::CorruptedDump)?, + documents_count, + allow_index_creation, + }, + KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion { + documents_ids, + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + }, + KindDump::DocumentClear => KindWithContent::DocumentClear { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + }, + KindDump::Settings { settings, is_deletion, allow_index_creation } => { + KindWithContent::SettingsUpdate { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + new_settings: settings, + is_deletion, + allow_index_creation, + } + } + KindDump::IndexDeletion => KindWithContent::IndexDeletion { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + }, + KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + primary_key, + }, + KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + primary_key, + }, + KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps }, + KindDump::TaskCancelation { query, tasks } => { + KindWithContent::TaskCancelation { query, tasks } + } + KindDump::TasksDeletion { query, tasks } => { + KindWithContent::TaskDeletion { query, tasks } + } + KindDump::DumpCreation { keys, instance_uid } => { + KindWithContent::DumpCreation { keys, instance_uid } + } + KindDump::SnapshotCreation => KindWithContent::SnapshotCreation, + }, + }; + + self.index_scheduler.all_tasks.put(&mut self.wtxn, &BEU32::new(task.uid), &task)?; + + for index in task.indexes() { + self.index_scheduler.update_index(&mut self.wtxn, index, |bitmap| { + bitmap.insert(task.uid); + })?; + } + + self.index_scheduler.update_status(&mut self.wtxn, task.status, |bitmap| { + bitmap.insert(task.uid); + })?; + + self.index_scheduler.update_kind(&mut self.wtxn, task.kind.as_kind(), |bitmap| { + (bitmap.insert(task.uid)); + })?; + + Ok(task) + } + + /// Commit all the changes and exit the importing dump state + pub fn finish(self) -> Result<()> { + self.wtxn.commit()?; + self.index_scheduler.wake_up.signal(); + Ok(()) + } +} + /// The outcome of calling the [`IndexScheduler::tick`] function. pub enum TickOutcome { /// The scheduler should immediately attempt another `tick`. diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 13c236983..98e754e67 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -367,12 +367,14 @@ fn import_dump( log::info!("All documents successfully imported."); } + let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; + // 4. Import the tasks. for ret in dump_reader.tasks()? { let (task, file) = ret?; - index_scheduler.register_dumped_task(task, file)?; + index_scheduler_dump.register_dumped_task(task, file)?; } - Ok(()) + Ok(index_scheduler_dump.finish()?) } pub fn configure_data(