diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index f57c27c59..eb2bc4684 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -59,10 +59,11 @@ impl DumpVersion { size: usize, dump_path: &Path, index_path: &Path, + primary_key: Option<&str>, ) -> anyhow::Result<()> { match self { - Self::V1 => v1::import_index(size, dump_path, index_path), - Self::V2 => v2::import_index(size, dump_path, index_path), + Self::V1 => v1::import_index(size, dump_path, index_path, primary_key), + Self::V2 => v2::import_index(size, dump_path, index_path, primary_key), } } } @@ -206,7 +207,26 @@ pub fn load_dump( // this cannot fail since we created all the missing uuid in the previous loop let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap(); let index_path = db_path.join(&format!("indexes/index-{}", uuid)); - // let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db + // let update_path = db_path.join(&format!("updates")); + + info!("importing the updates"); + use crate::index_controller::update_actor::UpdateStore; + use std::io::BufRead; + + let update_path = db_path.join("updates"); + let options = EnvOpenOptions::new(); + // create an UpdateStore to import the updates + std::fs::create_dir_all(&update_path)?; + let (update_store, _) = UpdateStore::create(options, update_path)?; + let file = File::open(&dump_path.join("updates.jsonl"))?; + let reader = std::io::BufReader::new(file); + + let mut wtxn = update_store.env.write_txn()?; + for update in reader.lines() { + let update = serde_json::from_str(&update?)?; + update_store.register_raw_updates(&mut wtxn, update, uuid)?; + } + wtxn.commit()?; info!( "Importing dump from {} into {}...", @@ -215,11 +235,12 @@ pub fn load_dump( ); metadata .dump_version - .import_index(size, &dump_path, &index_path) + .import_index(size, &dump_path, &index_path, idx.meta.primary_key.as_ref().map(|s| s.as_ref())) .unwrap(); info!("Dump importation from {} succeed", dump_path.display()); } + info!("Dump importation from {} succeed", dump_path.display()); Ok(()) } diff --git a/meilisearch-http/src/index_controller/dump_actor/v1.rs b/meilisearch-http/src/index_controller/dump_actor/v1.rs index f22120849..d20723e8c 100644 --- a/meilisearch-http/src/index_controller/dump_actor/v1.rs +++ b/meilisearch-http/src/index_controller/dump_actor/v1.rs @@ -78,7 +78,7 @@ fn import_settings(dir_path: &Path) -> anyhow::Result { } -pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> { +pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { info!("Importing a dump from an old version of meilisearch with dump version 1"); std::fs::create_dir_all(&index_path)?; @@ -102,7 +102,7 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow: IndexDocumentsMethod::ReplaceDocuments, Some(reader), update_builder, - None, + primary_key, )?; // at this point we should handle the updates, but since the update logic is not handled in diff --git a/meilisearch-http/src/index_controller/dump_actor/v2.rs b/meilisearch-http/src/index_controller/dump_actor/v2.rs index 5c5e5fb2d..301268233 100644 --- a/meilisearch-http/src/index_controller/dump_actor/v2.rs +++ b/meilisearch-http/src/index_controller/dump_actor/v2.rs @@ -1,5 +1,5 @@ use heed::EnvOpenOptions; -use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; +use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}}; use crate::index::Index; use crate::index_controller::Settings; use std::{fs::File, path::Path, sync::Arc}; @@ -14,7 +14,7 @@ fn import_settings(dir_path: &Path) -> anyhow::Result { Ok(metadata) } -pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> { +pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { std::fs::create_dir_all(&index_path)?; let mut options = EnvOpenOptions::new(); options.map_size(size); @@ -26,17 +26,21 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow: let update_builder = UpdateBuilder::new(0); index.update_settings(&settings, update_builder)?; + // import the documents in the index let update_builder = UpdateBuilder::new(1); let file = File::open(&dump_path.join("documents.jsonl"))?; let reader = std::io::BufReader::new(file); - index.update_documents( + // TODO: TAMO: currently we ignore any error caused by the importation of the documents because + // if there is no documents nor primary key it'll throw an anyhow error, but we must remove + // this before the merge on main + let _ = index.update_documents( UpdateFormat::JsonStream, IndexDocumentsMethod::ReplaceDocuments, Some(reader), update_builder, - None, - )?; + primary_key, + ); // the last step: we extract the original milli::Index and close it Arc::try_unwrap(index.0) diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index cf6a81223..3b92b1078 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -31,7 +31,7 @@ pub type IndexResult = std::result::Result; pub struct IndexMeta { created_at: DateTime, pub updated_at: DateTime, - primary_key: Option, + pub primary_key: Option, } impl IndexMeta { diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index 524fefe84..745311f05 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -250,21 +250,31 @@ impl UpdateStore { .get(txn, &NextIdKey::Global)? .map(U64::get) .unwrap_or_default(); + + self.next_update_id + .put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?; + + let update_id = self.next_update_id_raw(txn, index_uuid)?; + + Ok((global_id, update_id)) + } + + /// Returns the next next update id for a given `index_uuid` without + /// incrementing the global update id. This is useful for the dumps. + fn next_update_id_raw(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result { let update_id = self .next_update_id .get(txn, &NextIdKey::Index(index_uuid))? .map(U64::get) .unwrap_or_default(); - self.next_update_id - .put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?; self.next_update_id.put( txn, &NextIdKey::Index(index_uuid), &BEU64::new(update_id + 1), )?; - Ok((global_id, update_id)) + Ok(update_id) } /// Registers the update content in the pending store and the meta @@ -291,17 +301,27 @@ impl UpdateStore { Ok(meta) } - /// Push already processed updates in the UpdateStore. This is useful for the dumps - pub fn register_already_processed_update ( + /// Push already processed update in the UpdateStore without triggering the notification + /// process. This is useful for the dumps. + pub fn register_raw_updates ( &self, - result: UpdateStatus, + wtxn: &mut heed::RwTxn, + update: UpdateStatus, index_uuid: Uuid, ) -> heed::Result<()> { - // TODO: TAMO: load already processed updates - let mut wtxn = self.env.write_txn()?; - let (_global_id, update_id) = self.next_update_id(&mut wtxn, index_uuid)?; - self.updates.remap_key_type::().put(&mut wtxn, &(index_uuid, update_id), &result)?; - wtxn.commit() + // TODO: TAMO: since I don't want to store anything I currently generate a new global ID + // everytime I encounter an enqueued update, can we do better? + match update { + UpdateStatus::Enqueued(enqueued) => { + let (global_id, update_id) = self.next_update_id(wtxn, index_uuid)?; + self.pending_queue.remap_key_type::().put(wtxn, &(global_id, index_uuid, update_id), &enqueued)?; + } + _ => { + let update_id = self.next_update_id_raw(wtxn, index_uuid)?; + self.updates.remap_key_type::().put(wtxn, &(index_uuid, update_id), &update)?; + } + } + Ok(()) } /// Executes the user provided function on the next pending update (the one with the lowest id). @@ -542,9 +562,6 @@ impl UpdateStore { } } - // TODO: TAMO: the updates - // already processed updates seems to works, but I've not tried with currently running updates - let update_files_path = path.join("update_files"); create_dir_all(&update_files_path)?; @@ -561,7 +578,6 @@ impl UpdateStore { } } - // Perform the dump of each index concurently. Only a third of the capabilities of // the index actor at a time not to put too much pressure on the index actor let path = &path;