diff --git a/meilisearch-http/src/dump.rs b/meilisearch-http/src/dump.rs deleted file mode 100644 index 544fffaa7..000000000 --- a/meilisearch-http/src/dump.rs +++ /dev/null @@ -1,423 +0,0 @@ -use std::fs::{create_dir_all, File}; -use std::io::prelude::*; -use std::path::{Path, PathBuf}; -use std::sync::Mutex; -use std::thread; - -use actix_web::web; -use chrono::offset::Utc; -use indexmap::IndexMap; -use log::{error, info}; -use once_cell::sync::Lazy; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use tempfile::TempDir; - -use crate::Data; -use crate::error::{Error, ResponseError}; -use crate::helpers::compression; -use crate::routes::index; -use crate::routes::setting::Settings; -use crate::routes::index::IndexResponse; - -// Mutex to share dump progress. -static DUMP_INFO: Lazy>> = Lazy::new(Mutex::default); - -#[derive(Debug, Serialize, Deserialize, Copy, Clone)] -enum DumpVersion { - V1, -} - -impl DumpVersion { - const CURRENT: Self = Self::V1; -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct DumpMetadata { - indexes: Vec, - db_version: String, - dump_version: DumpVersion, -} - -impl DumpMetadata { - /// Create a DumpMetadata with the current dump version of meilisearch. - pub fn new(indexes: Vec, db_version: String) -> Self { - DumpMetadata { - indexes, - db_version, - dump_version: DumpVersion::CURRENT, - } - } - - /// Extract DumpMetadata from `metadata.json` file present at provided `dir_path` - fn from_path(dir_path: &Path) -> Result { - let path = dir_path.join("metadata.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata = serde_json::from_reader(reader)?; - - Ok(metadata) - } - - /// Write DumpMetadata in `metadata.json` file at provided `dir_path` - fn to_path(&self, dir_path: &Path) -> Result<(), Error> { - let path = dir_path.join("metadata.json"); - let file = File::create(path)?; - - serde_json::to_writer(file, &self)?; - - Ok(()) - } -} - -/// Extract Settings from `settings.json` file present at provided `dir_path` -fn settings_from_path(dir_path: &Path) -> Result { - let path = dir_path.join("settings.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata = serde_json::from_reader(reader)?; - - Ok(metadata) -} - -/// Write Settings in `settings.json` file at provided `dir_path` -fn settings_to_path(settings: &Settings, dir_path: &Path) -> Result<(), Error> { - let path = dir_path.join("settings.json"); - let file = File::create(path)?; - - serde_json::to_writer(file, settings)?; - - Ok(()) -} - -/// Import settings and documents of a dump with version `DumpVersion::V1` in specified index. -fn import_index_v1( - data: &Data, - dumps_dir: &Path, - index_uid: &str, - document_batch_size: usize, - write_txn: &mut MainWriter, -) -> Result<(), Error> { - - // open index - let index = data - .db - .open_index(index_uid) - .ok_or(Error::index_not_found(index_uid))?; - - // index dir path in dump dir - let index_path = &dumps_dir.join(index_uid); - - // extract `settings.json` file and import content - let settings = settings_from_path(&index_path)?; - let settings = settings.to_update().map_err(|e| Error::dump_failed(format!("importing settings for index {}; {}", index_uid, e)))?; - apply_settings_update(write_txn, &index, settings)?; - - // create iterator over documents in `documents.jsonl` to make batch importation - // create iterator over documents in `documents.jsonl` to make batch importation - let documents = { - let file = File::open(&index_path.join("documents.jsonl"))?; - let reader = std::io::BufReader::new(file); - let deserializer = serde_json::Deserializer::from_reader(reader); - deserializer.into_iter::>() - }; - - // batch import document every `document_batch_size`: - // create a Vec to bufferize documents - let mut values = Vec::with_capacity(document_batch_size); - // iterate over documents - for document in documents { - // push document in buffer - values.push(document?); - // if buffer is full, create and apply a batch, and clean buffer - if values.len() == document_batch_size { - let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size)); - apply_documents_addition(write_txn, &index, batch)?; - } - } - - // apply documents remaining in the buffer - if !values.is_empty() { - apply_documents_addition(write_txn, &index, values)?; - } - - // sync index information: stats, updated_at, last_update - if let Err(e) = crate::index_update_callback_txn(index, index_uid, data, write_txn) { - return Err(Error::Internal(e)); - } - - Ok(()) -} - -/// Import dump from `dump_path` in database. -pub fn import_dump( - data: &Data, - dump_path: &Path, - document_batch_size: usize, -) -> Result<(), Error> { - info!("Importing dump from {:?}...", dump_path); - - // create a temporary directory - let tmp_dir = TempDir::new()?; - let tmp_dir_path = tmp_dir.path(); - - // extract dump in temporary directory - compression::from_tar_gz(dump_path, tmp_dir_path)?; - - // read dump metadata - let metadata = DumpMetadata::from_path(&tmp_dir_path)?; - - // choose importation function from DumpVersion of metadata - let import_index = match metadata.dump_version { - DumpVersion::V1 => import_index_v1, - }; - - // remove indexes which have same `uid` than indexes to import and create empty indexes - let existing_index_uids = data.db.indexes_uids(); - for index in metadata.indexes.iter() { - if existing_index_uids.contains(&index.uid) { - data.db.delete_index(index.uid.clone())?; - } - index::create_index_sync(&data.db, index.uid.clone(), index.name.clone(), index.primary_key.clone())?; - } - - // import each indexes content - data.db.main_write::<_, _, Error>(|mut writer| { - for index in metadata.indexes { - import_index(&data, tmp_dir_path, &index.uid, document_batch_size, &mut writer)?; - } - Ok(()) - })?; - - info!("Dump importation from {:?} succeed", dump_path); - Ok(()) -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum DumpStatus { - Done, - InProgress, - Failed, -} - -#[derive(Debug, Serialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct DumpInfo { - pub uid: String, - pub status: DumpStatus, - #[serde(skip_serializing_if = "Option::is_none", flatten)] - pub error: Option, -} - -impl DumpInfo { - pub fn new(uid: String, status: DumpStatus) -> Self { - Self { uid, status, error: None } - } - - pub fn with_error(mut self, error: ResponseError) -> Self { - self.status = DumpStatus::Failed; - self.error = Some(json!(error)); - - self - } - - pub fn dump_already_in_progress(&self) -> bool { - self.status == DumpStatus::InProgress - } - - pub fn get_current() -> Option { - DUMP_INFO.lock().unwrap().clone() - } - - pub fn set_current(&self) { - *DUMP_INFO.lock().unwrap() = Some(self.clone()); - } -} - -/// Generate uid from creation date -fn generate_uid() -> String { - Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() -} - -/// Infer dumps_dir from dump_uid -pub fn compressed_dumps_dir(dumps_dir: &Path, dump_uid: &str) -> PathBuf { - dumps_dir.join(format!("{}.dump", dump_uid)) -} - -/// Write metadata in dump -fn dump_metadata(data: &web::Data, dir_path: &Path, indexes: Vec) -> Result<(), Error> { - let (db_major, db_minor, db_patch) = data.db.version(); - let metadata = DumpMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch)); - - metadata.to_path(dir_path) -} - -/// Export settings of provided index in dump -fn dump_index_settings(data: &web::Data, reader: &MainReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> { - let settings = crate::routes::setting::get_all_sync(data, reader, index_uid)?; - - settings_to_path(&settings, dir_path) -} - -/// Export updates of provided index in dump -fn dump_index_updates(data: &web::Data, reader: &UpdateReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> { - let updates_path = dir_path.join("updates.jsonl"); - let updates = crate::routes::index::get_all_updates_status_sync(data, reader, index_uid)?; - - let file = File::create(updates_path)?; - - for update in updates { - serde_json::to_writer(&file, &update)?; - writeln!(&file)?; - } - - Ok(()) -} - -/// Export documents of provided index in dump -fn dump_index_documents(data: &web::Data, reader: &MainReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> { - let documents_path = dir_path.join("documents.jsonl"); - let file = File::create(documents_path)?; - let dump_batch_size = data.dump_batch_size; - - let mut offset = 0; - loop { - let documents = crate::routes::document::get_all_documents_sync(data, reader, index_uid, offset, dump_batch_size, None)?; - if documents.is_empty() { break; } else { offset += dump_batch_size; } - - for document in documents { - serde_json::to_writer(&file, &document)?; - writeln!(&file)?; - } - } - - Ok(()) -} - -/// Write error with a context. -fn fail_dump_process(dump_info: DumpInfo, context: &str, error: E) { - let error_message = format!("{}; {}", context, error); - - error!("Something went wrong during dump process: {}", &error_message); - dump_info.with_error(Error::dump_failed(error_message).into()).set_current(); -} - -/// Main function of dump. -fn dump_process(data: web::Data, dumps_dir: PathBuf, dump_info: DumpInfo) { - // open read transaction on Update - let update_reader = match data.db.update_read_txn() { - Ok(r) => r, - Err(e) => { - fail_dump_process(dump_info, "creating RO transaction on updates", e); - return ; - } - }; - - // open read transaction on Main - let main_reader = match data.db.main_read_txn() { - Ok(r) => r, - Err(e) => { - fail_dump_process(dump_info, "creating RO transaction on main", e); - return ; - } - }; - - // create a temporary directory - let tmp_dir = match TempDir::new() { - Ok(tmp_dir) => tmp_dir, - Err(e) => { - fail_dump_process(dump_info, "creating temporary directory", e); - return ; - } - }; - let tmp_dir_path = tmp_dir.path(); - - // fetch indexes - let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) { - Ok(indexes) => indexes, - Err(e) => { - fail_dump_process(dump_info, "listing indexes", e); - return ; - } - }; - - // create metadata - if let Err(e) = dump_metadata(&data, &tmp_dir_path, indexes.clone()) { - fail_dump_process(dump_info, "generating metadata", e); - return ; - } - - // export settings, updates and documents for each indexes - for index in indexes { - let index_path = tmp_dir_path.join(&index.uid); - - // create index sub-dircetory - if let Err(e) = create_dir_all(&index_path) { - fail_dump_process(dump_info, &format!("creating directory for index {}", &index.uid), e); - return ; - } - - // export settings - if let Err(e) = dump_index_settings(&data, &main_reader, &index_path, &index.uid) { - fail_dump_process(dump_info, &format!("generating settings for index {}", &index.uid), e); - return ; - } - - // export documents - if let Err(e) = dump_index_documents(&data, &main_reader, &index_path, &index.uid) { - fail_dump_process(dump_info, &format!("generating documents for index {}", &index.uid), e); - return ; - } - - // export updates - if let Err(e) = dump_index_updates(&data, &update_reader, &index_path, &index.uid) { - fail_dump_process(dump_info, &format!("generating updates for index {}", &index.uid), e); - return ; - } - } - - // compress dump in a file named `{dump_uid}.dump` in `dumps_dir` - if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_dumps_dir(&dumps_dir, &dump_info.uid)) { - fail_dump_process(dump_info, "compressing dump", e); - return ; - } - - // update dump info to `done` - let resume = DumpInfo::new( - dump_info.uid, - DumpStatus::Done - ); - - resume.set_current(); -} - -pub fn init_dump_process(data: &web::Data, dumps_dir: &Path) -> Result { - create_dir_all(dumps_dir).map_err(|e| Error::dump_failed(format!("creating temporary directory {}", e)))?; - - // check if a dump is already in progress - if let Some(resume) = DumpInfo::get_current() { - if resume.dump_already_in_progress() { - return Err(Error::dump_conflict()) - } - } - - // generate a new dump info - let info = DumpInfo::new( - generate_uid(), - DumpStatus::InProgress - ); - - info.set_current(); - - let data = data.clone(); - let dumps_dir = dumps_dir.to_path_buf(); - let info_cloned = info.clone(); - // run dump process in a new thread - thread::spawn(move || - dump_process(data, dumps_dir, info_cloned) - ); - - Ok(info) -} diff --git a/meilisearch-http/src/index_controller/dump.rs b/meilisearch-http/src/index_controller/dump.rs new file mode 100644 index 000000000..afdcfd9ce --- /dev/null +++ b/meilisearch-http/src/index_controller/dump.rs @@ -0,0 +1,258 @@ +use std::{ + fs::File, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::bail; +use heed::EnvOpenOptions; +use log::{error, info}; +use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; +use serde::{Deserialize, Serialize}; +use tempfile::TempDir; +use tokio::fs; +use tokio::task::spawn_blocking; + +use super::update_actor::UpdateActorHandle; +use super::uuid_resolver::UuidResolverHandle; +use super::IndexMetadata; +use crate::index::Index; +use crate::index_controller::uuid_resolver; +use crate::{helpers::compression, index::Settings}; + +#[derive(Debug, Serialize, Deserialize, Copy, Clone)] +enum DumpVersion { + V1, +} + +impl DumpVersion { + const CURRENT: Self = Self::V1; +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DumpMetadata { + indexes: Vec, + db_version: String, + dump_version: DumpVersion, +} + +impl DumpMetadata { + /// Create a DumpMetadata with the current dump version of meilisearch. + pub fn new(indexes: Vec, db_version: String) -> Self { + DumpMetadata { + indexes, + db_version, + dump_version: DumpVersion::CURRENT, + } + } + + /// Extract DumpMetadata from `metadata.json` file present at provided `dir_path` + fn from_path(dir_path: &Path) -> anyhow::Result { + let path = dir_path.join("metadata.json"); + let file = File::open(path)?; + let reader = std::io::BufReader::new(file); + let metadata = serde_json::from_reader(reader)?; + + Ok(metadata) + } + + /// Write DumpMetadata in `metadata.json` file at provided `dir_path` + fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> { + let path = dir_path.join("metadata.json"); + let file = File::create(path)?; + + serde_json::to_writer(file, &self)?; + + Ok(()) + } +} + +pub struct DumpService { + uuid_resolver_handle: R, + update_handle: U, + dump_path: PathBuf, + db_name: String, +} + +impl DumpService +where + U: UpdateActorHandle, + R: UuidResolverHandle, +{ + pub fn new( + uuid_resolver_handle: R, + update_handle: U, + dump_path: PathBuf, + db_name: String, + ) -> Self { + Self { + uuid_resolver_handle, + update_handle, + dump_path, + db_name, + } + } + + pub async fn run(self) { + if let Err(e) = self.perform_dump().await { + error!("{}", e); + } + } + + async fn perform_dump(&self) -> anyhow::Result<()> { + info!("Performing dump."); + + let dump_dir = self.dump_path.clone(); + fs::create_dir_all(&dump_dir).await?; + let temp_dump_dir = spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let uuids = self + .uuid_resolver_handle + .dump(temp_dump_path.clone()) + .await?; + + if uuids.is_empty() { + return Ok(()); + } + + let tasks = uuids + .iter() + .map(|&uuid| self.update_handle.dump(uuid, temp_dump_path.clone())) + .collect::>(); + + futures::future::try_join_all(tasks).await?; + + let dump_dir = self.dump_path.clone(); + let dump_path = self.dump_path.join(format!("{}.dump", self.db_name)); + let dump_path = spawn_blocking(move || -> anyhow::Result { + let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; + let temp_dump_file_path = temp_dump_file.path().to_owned(); + compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; + temp_dump_file.persist(&dump_path)?; + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) + } +} + +/// Extract Settings from `settings.json` file present at provided `dir_path` +fn settings_from_path(dir_path: &Path) -> anyhow::Result { + let path = dir_path.join("settings.json"); + let file = File::open(path)?; + let reader = std::io::BufReader::new(file); + let metadata = serde_json::from_reader(reader)?; + + Ok(metadata) +} + +/// Write Settings in `settings.json` file at provided `dir_path` +fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> { + let path = dir_path.join("settings.json"); + let file = File::create(path)?; + + serde_json::to_writer(file, settings)?; + + Ok(()) +} + +fn import_index_v1(size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> { + std::fs::create_dir_all(&index_path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, index_path)?; + let index = Index(Arc::new(index)); + + // extract `settings.json` file and import content + let settings = settings_from_path(&dump_path)?; + let update_builder = UpdateBuilder::new(0); + index.update_settings(&settings, update_builder)?; + + let update_builder = UpdateBuilder::new(1); + let file = File::open(&index_path.join("documents.jsonl"))?; + let reader = std::io::BufReader::new(file); + index.update_documents( + UpdateFormat::JsonStream, + IndexDocumentsMethod::ReplaceDocuments, + reader, + update_builder, + None, + )?; + + // the last step: we extract the milli::Index and close it + Arc::try_unwrap(index.0) + .map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index") + .unwrap() + .prepare_for_closing() + .wait(); + + Ok(()) +} + +pub fn load_dump( + db_path: impl AsRef, + dump_path: impl AsRef, + size: usize, +) -> anyhow::Result<()> { + info!("Importing dump from {}...", dump_path.as_ref().display()); + let db_path = db_path.as_ref(); + let dump_path = dump_path.as_ref(); + let uuid_resolver = uuid_resolver::UuidResolverHandleImpl::new(&db_path)?; + + // extract the dump in a temporary directory + let tmp_dir = TempDir::new()?; + let tmp_dir_path = tmp_dir.path(); + compression::from_tar_gz(dump_path, tmp_dir_path)?; + + // read dump metadata + let metadata = DumpMetadata::from_path(&tmp_dir_path)?; + + // choose importation function from DumpVersion of metadata + let import_index = match metadata.dump_version { + DumpVersion::V1 => import_index_v1, + }; + + // remove indexes which have same `uuid` than indexes to import and create empty indexes + let existing_index_uids = futures::executor::block_on(uuid_resolver.list())?; + + info!("Deleting indexes provided in the dump..."); + for idx in &metadata.indexes { + if let Some((_, uuid)) = existing_index_uids.iter().find(|(s, _)| s == &idx.uid) { + // if we find the index in the `uuid_resolver` it's supposed to exist on the file system + // and we want to delete it + let path = db_path.join(&format!("indexes/index-{}", uuid)); + info!("Deleting {}", path.display()); + use std::io::ErrorKind::*; + match std::fs::remove_dir_all(path) { + Ok(()) => (), + // if an index was present in the metadata but missing of the fs we can ignore the + // problem because we are going to create it later + Err(e) if e.kind() == NotFound => (), + Err(e) => bail!(e), + } + } else { + // if the index does not exist in the `uuid_resolver` we create it + futures::executor::block_on(uuid_resolver.create(idx.uid.clone()))?; + } + } + + // import each indexes content + for idx in metadata.indexes { + let dump_path = tmp_dir_path.join(&idx.uid); + let uuid = futures::executor::block_on(uuid_resolver.get(idx.uid))?; + let index_path = db_path.join(&format!("indexes/index-{}", uuid)); + + info!("Importing dump from {} into {}...", dump_path.display(), index_path.display()); + import_index(size, &dump_path, &index_path).unwrap(); + info!("Dump importation from {} succeed", dump_path.display()); + } + + + info!("Dump importation from {} succeed", dump_path.display()); + Ok(()) +} diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 1f1cf146b..535c405dc 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -36,6 +36,9 @@ impl IndexActor { Ok(Self { receiver, update_handler, store }) } + /// `run` poll the write_receiver and read_receiver concurrently, but while messages send + /// through the read channel are processed concurrently, the messages sent through the write + /// channel are processed one at a time. pub async fn run(mut self) { let mut receiver = self .receiver @@ -119,6 +122,9 @@ impl IndexActor { Snapshot { uuid, path, ret } => { let _ = ret.send(self.handle_snapshot(uuid, path).await); } + Dump { uuid, path, ret } => { + let _ = ret.send(self.handle_dump(uuid, path).await); + } GetStats { uuid, ret } => { let _ = ret.send(self.handle_get_stats(uuid).await); } @@ -306,7 +312,35 @@ impl IndexActor { Ok(()) } - async fn handle_get_stats(&self, uuid: Uuid) -> IndexResult { + async fn handle_dump(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> { + use tokio::fs::create_dir_all; + + path.push("indexes"); + create_dir_all(&path) + .await + .map_err(|e| IndexError::Error(e.into()))?; + + if let Some(index) = self.store.get(uuid).await? { + let mut index_path = path.join(format!("index-{}", uuid)); + create_dir_all(&index_path) + .await + .map_err(|e| IndexError::Error(e.into()))?; + index_path.push("data.mdb"); + spawn_blocking(move || -> anyhow::Result<()> { + // Get write txn to wait for ongoing write transaction before dump. + let _txn = index.write_txn()?; + index.env.copy_to_path(index_path, CompactionOption::Enabled)?; + Ok(()) + }) + .await + .map_err(|e| IndexError::Error(e.into()))? + .map_err(IndexError::Error)?; + } + + Ok(()) + } + + async fn handle_get_stats(&self, uuid: Uuid) -> Result { let index = self .store .get(uuid) diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index 4569ea020..d625a763e 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -136,7 +136,14 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { + async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Dump { uuid, path, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn get_index_stats(&self, uuid: Uuid) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetStats { uuid, ret }; let _ = self.sender.send(msg).await; diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index 4e2824871..0d88532ca 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -60,6 +60,11 @@ pub enum IndexMsg { path: PathBuf, ret: oneshot::Sender>, }, + Dump { + uuid: Uuid, + path: PathBuf, + ret: oneshot::Sender>, + }, GetStats { uuid: Uuid, ret: oneshot::Sender>, diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index f7f230349..46105742b 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -180,5 +180,4 @@ mod test { async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { self.as_ref().get_index_stats(uuid).await } - } } diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index f1da36740..10b9142cc 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -5,7 +5,6 @@ use std::time::Duration; use actix_web::web::{Bytes, Payload}; use anyhow::bail; -use chrono::{DateTime, Utc}; use futures::stream::StreamExt; use log::info; use milli::FieldsDistribution; @@ -25,6 +24,7 @@ use crate::option::Opt; mod index_actor; mod snapshot; +mod dump; mod update_actor; mod update_handler; mod updates; @@ -87,6 +87,13 @@ impl IndexController { options.ignore_snapshot_if_db_exists, options.ignore_missing_snapshot, )?; + } else if let Some(ref path) = options.import_dump { + load_dump( + &options.db_path, + path, + index_size, + ); + } std::fs::create_dir_all(&path)?; diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index e47edc5bc..7885d0b3b 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -71,11 +71,16 @@ where Some(Delete { uuid, ret }) => { let _ = ret.send(self.handle_delete(uuid).await); } - Some(Snapshot { uuids, path, ret }) => { - let _ = ret.send(self.handle_snapshot(uuids, path).await); + Some(Snapshot { uuid, path, ret }) => { + let _ = ret.send(self.handle_snapshot(uuid, path).await); + } + Some(Dump { uuid, path, ret }) => { + let _ = ret.send(self.handle_dump(uuid, path).await); } Some(GetInfo { ret }) => { let _ = ret.send(self.handle_get_info().await); + Some(GetSize { uuid, ret }) => { + let _ = ret.send(self.handle_get_size(uuid).await); } None => break, } @@ -194,9 +199,51 @@ where } async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let store = self.store.clone(); + let store = self.store.delete(uuid).await?; - tokio::task::spawn_blocking(move || store.delete_all(uuid)) + if let Some(store) = store { + tokio::task::spawn(async move { + let store = get_arc_ownership_blocking(store).await; + tokio::task::spawn_blocking(move || { + store.prepare_for_closing().wait(); + info!("Update store {} was closed.", uuid); + }); + }); + } + + Ok(()) + } + + async fn handle_create(&self, uuid: Uuid) -> Result<()> { + let _ = self.store.get_or_create(uuid).await?; + Ok(()) + } + + Ok(()) + } + + async fn handle_create(&self, uuid: Uuid) -> Result<()> { + let _ = self.store.get_or_create(uuid).await?; + Ok(()) + } + + async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let index_handle = self.index_handle.clone(); + if let Some(update_store) = self.store.get(uuid).await? { + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + // acquire write lock to prevent further writes during snapshot + // the update lock must be acquired BEFORE the write lock to prevent dead lock + let _lock = update_store.update_lock.lock(); + let mut txn = update_store.env.write_txn()?; + + // create db snapshot + update_store.snapshot(&mut txn, &path, uuid)?; + + futures::executor::block_on( + async move { index_handle.snapshot(uuid, path).await }, + )?; + Ok(()) + }) .await .map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?; @@ -245,4 +292,42 @@ where Ok(info) } + + async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let index_handle = self.index_handle.clone(); + if let Some(update_store) = self.store.get(uuid).await? { + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + // acquire write lock to prevent further writes during the dump + // the update lock must be acquired BEFORE the write lock to prevent dead lock + let _lock = update_store.update_lock.lock(); + let mut txn = update_store.env.write_txn()?; + + // create db dump + update_store.dump(&mut txn, &path, uuid)?; + + futures::executor::block_on( + async move { index_handle.dump(uuid, path).await }, + )?; + Ok(()) + }) + .await + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; + } + + Ok(()) + } + + async fn handle_get_size(&self, uuid: Uuid) -> Result { + let size = match self.store.get(uuid).await? { + Some(update_store) => tokio::task::spawn_blocking(move || -> anyhow::Result { + let txn = update_store.env.read_txn()?; + + update_store.get_size(&txn) + }) + .await + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?, + None => 0, + }; } diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index 999481573..569b896b0 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -78,6 +78,20 @@ where receiver.await.expect("update actor killed.") } + async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Dump { uuid, path, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + + async fn get_size(&self, uuid: Uuid) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::GetSize { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + async fn update( &self, meta: UpdateMeta, diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index 17b2b3579..3f39c224f 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -31,7 +31,16 @@ pub enum UpdateMsg { path: PathBuf, ret: oneshot::Sender>, }, + Dump { + uuid: Uuid, + path: PathBuf, + ret: oneshot::Sender>, + }, GetInfo { ret: oneshot::Sender>, }, + GetSize { + uuid: Uuid, + ret: oneshot::Sender>, + }, } diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index e7a12b7ff..4d8ab6f20 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -40,8 +40,11 @@ pub trait UpdateActorHandle { async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()>; + async fn create(&self, uuid: Uuid) -> Result<()>; + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; + async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()>; async fn get_info(&self) -> Result; + async fn get_size(&self, uuid: Uuid) -> Result; async fn update( &self, meta: UpdateMeta, diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index 6a916af33..4bc4c8c75 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -499,9 +499,56 @@ impl UpdateStore { Ok(()) } + pub fn dump( + &self, + txn: &mut heed::RwTxn, + path: impl AsRef, + uuid: Uuid, + ) -> anyhow::Result<()> { + let update_path = path.as_ref().join("updates"); + create_dir_all(&update_path)?; + + let mut dump_path = update_path.join(format!("update-{}", uuid)); + // acquire write lock to prevent further writes during dump + create_dir_all(&dump_path)?; + dump_path.push("data.mdb"); + + // create db dump + self.env.copy_to_path(&dump_path, CompactionOption::Enabled)?; + + let update_files_path = update_path.join("update_files"); + create_dir_all(&update_files_path)?; + + for path in self.pending.iter(&txn)? { + let (_, path) = path?; + let name = path.file_name().unwrap(); + let to = update_files_path.join(name); + copy(path, to)?; + } + + Ok(()) + } + pub fn get_info(&self) -> anyhow::Result { let mut size = self.env.size(); let txn = self.env.read_txn()?; + for entry in self.pending_queue.iter(&txn)? { + let (_, pending) = entry?; + if let Some(path) = pending.content_path() { + size += File::open(path)?.metadata()?.len(); + } + } + let processing = match *self.state.read() { + State::Processing(uuid, _) => Some(uuid), + _ => None, + }; + + Ok(UpdateStoreInfo { size, processing }) + } + + pub fn get_size(&self, txn: &heed::RoTxn) -> anyhow::Result { + let mut size = self.env.size(); + let txn = self.env.read_txn()?; for entry in self.pending_queue.iter(&txn)? { let (_, pending) = entry?; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs index 253326276..9c180e4a8 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs @@ -41,6 +41,9 @@ impl UuidResolverActor { Some(SnapshotRequest { path, ret }) => { let _ = ret.send(self.handle_snapshot(path).await); } + Some(DumpRequest { path, ret }) => { + let _ = ret.send(self.handle_dump(path).await); + } Some(GetSize { ret }) => { let _ = ret.send(self.handle_get_size().await); } @@ -82,6 +85,10 @@ impl UuidResolverActor { self.store.snapshot(path).await } + async fn handle_dump(&self, path: PathBuf) -> Result> { + self.store.dump(path).await + } + async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { if !is_index_uid_valid(&uid) { return Err(UuidError::BadlyFormatted(uid)); diff --git a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs index db4c482bd..e47f9a8e0 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs @@ -68,6 +68,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .expect("Uuid resolver actor has been killed")?) } + /// TODO: we should merge this function with the dump function async fn snapshot(&self, path: PathBuf) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::SnapshotRequest { path, ret }; @@ -77,6 +78,15 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .expect("Uuid resolver actor has been killed")?) } + async fn dump(&self, path: PathBuf) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::DumpRequest { path, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + async fn get_size(&self) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::GetSize { ret }; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/message.rs b/meilisearch-http/src/index_controller/uuid_resolver/message.rs index a72bf0587..67493c2cd 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/message.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/message.rs @@ -31,6 +31,10 @@ pub enum UuidResolveMsg { path: PathBuf, ret: oneshot::Sender>>, }, + DumpRequest { + path: PathBuf, + ret: oneshot::Sender>>, + }, GetSize { ret: oneshot::Sender>, }, diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs index ef17133ff..a8361095c 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -31,6 +31,7 @@ pub trait UuidResolverHandle { async fn delete(&self, name: String) -> anyhow::Result; async fn list(&self) -> anyhow::Result>; async fn snapshot(&self, path: PathBuf) -> Result>; + async fn dump(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 29c034c44..df4c3a2fb 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -21,6 +21,7 @@ pub trait UuidStore { async fn list(&self) -> Result>; async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; async fn snapshot(&self, path: PathBuf) -> Result>; + async fn dump(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; } @@ -130,6 +131,8 @@ impl UuidStore for HeedUuidStore { .await? } + // TODO: we should merge this function and the following function for the dump. it's exactly + // the same code async fn snapshot(&self, mut path: PathBuf) -> Result> { let env = self.env.clone(); let db = self.db; @@ -155,6 +158,31 @@ impl UuidStore for HeedUuidStore { .await? } + async fn dump(&self, mut path: PathBuf) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + // Write transaction to acquire a lock on the database. + let txn = env.write_txn()?; + let mut entries = Vec::new(); + for entry in db.iter(&txn)? { + let (_, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.push(uuid) + } + + // only perform dump if there are indexes + if !entries.is_empty() { + path.push("index_uuids"); + create_dir_all(&path).unwrap(); + path.push("data.mdb"); + env.copy_to_path(path, CompactionOption::Enabled)?; + } + Ok(entries) + }) + .await? + } + async fn get_size(&self) -> Result { Ok(self.env.size()) } diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index 1997718cc..87238c4d7 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -203,6 +203,7 @@ pub struct Opt { pub import_dump: Option, /// The batch size used in the importation process, the bigger it is the faster the dump is created. + /// This options is now deprecated and will be ignored #[structopt(long, env = "MEILI_DUMP_BATCH_SIZE", default_value = "1024")] pub dump_batch_size: usize, diff --git a/meilisearch-http/src/routes/index.rs b/meilisearch-http/src/routes/index.rs index 4424c8cfe..1afc01806 100644 --- a/meilisearch-http/src/routes/index.rs +++ b/meilisearch-http/src/routes/index.rs @@ -1,5 +1,6 @@ use actix_web::{delete, get, post, put}; use actix_web::{web, HttpResponse}; +use chrono::DateTime; use serde::Deserialize; use crate::error::ResponseError; @@ -68,6 +69,16 @@ struct UpdateIndexRequest { primary_key: Option, } +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateIndexResponse { + name: String, + uid: String, + created_at: DateTime, + updated_at: DateTime, + primary_key: Option, +} + #[put("/indexes/{index_uid}", wrap = "Authentication::Private")] async fn update_index( data: web::Data, diff --git a/meilisearch-http/tests/settings/get_settings.rs b/meilisearch-http/tests/settings/get_settings.rs index 4230e19f8..e5f51d7f0 100644 --- a/meilisearch-http/tests/settings/get_settings.rs +++ b/meilisearch-http/tests/settings/get_settings.rs @@ -19,7 +19,7 @@ async fn get_settings() { assert_eq!(settings.keys().len(), 6); assert_eq!(settings["displayedAttributes"], json!(["*"])); assert_eq!(settings["searchableAttributes"], json!(["*"])); - assert_eq!(settings["attributesForFaceting"], json!({})); + assert_eq!(settings["attributesForFaceting"], json!(null)); assert_eq!(settings["distinctAttribute"], json!(null)); assert_eq!( settings["rankingRules"],