From 8b7735c20a1779059a47ab58242b9f67320f0e46 Mon Sep 17 00:00:00 2001 From: tamo Date: Tue, 11 May 2021 00:20:55 +0200 Subject: [PATCH] move the import of the updates in the v2 and ignore the v1 for now --- .../src/index_controller/dump_actor/mod.rs | 39 +++++-------------- .../src/index_controller/dump_actor/v1.rs | 3 +- .../src/index_controller/dump_actor/v2.rs | 32 ++++++++++++++- 3 files changed, 41 insertions(+), 33 deletions(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index a8409f623..d416d7d92 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -16,11 +16,12 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tempfile::TempDir; use thiserror::Error; +use uuid::Uuid; use super::IndexMetadata; use crate::helpers::compression; use crate::index::Index; -use crate::index_controller::{uuid_resolver, UpdateStatus}; +use crate::index_controller::uuid_resolver; pub use actor::DumpActor; pub use handle_impl::*; @@ -53,13 +54,14 @@ impl DumpVersion { pub fn import_index( self, size: usize, + uuid: Uuid, dump_path: &Path, - index_path: &Path, + db_path: &Path, primary_key: Option<&str>, ) -> anyhow::Result<()> { match self { - Self::V1 => v1::import_index(size, dump_path, index_path, primary_key), - Self::V2 => v2::import_index(size, dump_path, index_path, primary_key), + Self::V1 => v1::import_index(size, uuid, dump_path, db_path, primary_key), + Self::V2 => v2::import_index(size, uuid, dump_path, db_path, primary_key), } } } @@ -200,46 +202,23 @@ pub fn load_dump( let dump_path = tmp_dir_path.join(&idx.uid); // this cannot fail since we created all the missing uuid in the previous loop let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap(); - let index_path = db_path.join(&format!("indexes/index-{}", uuid)); - // let update_path = db_path.join(&format!("updates")); info!( "Importing dump from {} into {}...", dump_path.display(), - index_path.display() + db_path.display() ); metadata .dump_version .import_index( size, + uuid, &dump_path, - &index_path, + &db_path, idx.meta.primary_key.as_ref().map(|s| s.as_ref()), ) .unwrap(); info!("Dump importation from {} succeed", dump_path.display()); - - info!("importing the updates"); - use crate::index_controller::update_actor::UpdateStore; - use std::io::BufRead; - - let update_path = db_path.join("updates"); - let options = EnvOpenOptions::new(); - // create an UpdateStore to import the updates - std::fs::create_dir_all(&update_path)?; - let (update_store, _) = UpdateStore::create(options, &update_path)?; - let file = File::open(&dump_path.join("updates.jsonl"))?; - let reader = std::io::BufReader::new(file); - - let mut wtxn = update_store.env.write_txn()?; - for update in reader.lines() { - let mut update: UpdateStatus = serde_json::from_str(&update?)?; - if let Some(path) = update.content_path_mut() { - *path = update_path.join("update_files").join(&path).into(); - } - update_store.register_raw_updates(&mut wtxn, update, uuid)?; - } - wtxn.commit()?; } // finally we can move all the unprocessed update file into our new DB diff --git a/meilisearch-http/src/index_controller/dump_actor/v1.rs b/meilisearch-http/src/index_controller/dump_actor/v1.rs index 33fab6930..fad48dd8f 100644 --- a/meilisearch-http/src/index_controller/dump_actor/v1.rs +++ b/meilisearch-http/src/index_controller/dump_actor/v1.rs @@ -79,7 +79,8 @@ fn import_settings(dir_path: &Path) -> anyhow::Result { } -pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { +pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { + let index_path = db_path.join(&format!("indexes/index-{}", uuid)); info!("Importing a dump from an old version of meilisearch with dump version 1"); std::fs::create_dir_all(&index_path)?; diff --git a/meilisearch-http/src/index_controller/dump_actor/v2.rs b/meilisearch-http/src/index_controller/dump_actor/v2.rs index d8f43fc58..969442296 100644 --- a/meilisearch-http/src/index_controller/dump_actor/v2.rs +++ b/meilisearch-http/src/index_controller/dump_actor/v2.rs @@ -1,4 +1,8 @@ use heed::EnvOpenOptions; +use log::info; +use uuid::Uuid; +use crate::index_controller::{UpdateStatus, update_actor::UpdateStore}; +use std::io::BufRead; use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}}; use crate::index::{Checked, Index}; use crate::index_controller::Settings; @@ -14,13 +18,15 @@ fn import_settings(dir_path: &Path) -> anyhow::Result> { Ok(metadata) } -pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { +pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { + let index_path = db_path.join(&format!("indexes/index-{}", uuid)); std::fs::create_dir_all(&index_path)?; let mut options = EnvOpenOptions::new(); options.map_size(size); let index = milli::Index::new(options, index_path)?; let index = Index(Arc::new(index)); + info!("importing the settings..."); // extract `settings.json` file and import content let settings = import_settings(&dump_path)?; let update_builder = UpdateBuilder::new(0); @@ -31,6 +37,7 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_ke let file = File::open(&dump_path.join("documents.jsonl"))?; let reader = std::io::BufReader::new(file); + info!("importing the documents..."); // TODO: TAMO: currently we ignore any error caused by the importation of the documents because // if there is no documents nor primary key it'll throw an anyhow error, but we must remove // this before the merge on main @@ -49,6 +56,27 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_ke .prepare_for_closing() .wait(); - Ok(()) + info!("importing the updates..."); + import_updates(uuid, dump_path, db_path) } +fn import_updates(uuid: Uuid, dump_path: &Path, db_path: &Path) -> anyhow::Result<()> { + let update_path = db_path.join("updates"); + let options = EnvOpenOptions::new(); + // create an UpdateStore to import the updates + std::fs::create_dir_all(&update_path)?; + let (update_store, _) = UpdateStore::create(options, &update_path)?; + let file = File::open(&dump_path.join("updates.jsonl"))?; + let reader = std::io::BufReader::new(file); + + let mut wtxn = update_store.env.write_txn()?; + for update in reader.lines() { + let mut update: UpdateStatus = serde_json::from_str(&update?)?; + if let Some(path) = update.content_path_mut() { + *path = update_path.join("update_files").join(&path).into(); + } + update_store.register_raw_updates(&mut wtxn, update, uuid)?; + } + wtxn.commit()?; + Ok(()) +}