diff --git a/Cargo.lock b/Cargo.lock index f1c109a79..26c53663a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "actix-codec" version = "0.4.0" @@ -1840,8 +1842,8 @@ dependencies = [ [[package]] name = "milli" -version = "0.2.0" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.2.0#792225eaffce6b3682f9b30b7370b6a547c4757e" +version = "0.2.1" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.2.1#25f75d4d03732131e6edcf20f4d126210b159d43" dependencies = [ "anyhow", "bstr", diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 7ac3ecb38..c9f8d63b7 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -51,7 +51,7 @@ main_error = "0.1.0" meilisearch-error = { path = "../meilisearch-error" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/Tokenizer.git", tag = "v0.2.2" } memmap = "0.7.0" -milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.2.0" } +milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.2.1" } mime = "0.3.16" once_cell = "1.5.2" oxidized-json-checker = "0.3.2" diff --git a/meilisearch-http/build.rs b/meilisearch-http/build.rs index 5dbde1477..557e04fe7 100644 --- a/meilisearch-http/build.rs +++ b/meilisearch-http/build.rs @@ -50,7 +50,7 @@ mod mini_dashboard { sha1_file.read_to_string(&mut sha1)?; if sha1 == meta["sha1"].as_str().unwrap() { // Nothing to do. - return Ok(()) + return Ok(()); } } @@ -62,7 +62,11 @@ mod mini_dashboard { hasher.update(&dashboard_assets_bytes); let sha1 = hex::encode(hasher.finalize()); - assert_eq!(meta["sha1"].as_str().unwrap(), sha1, "Downloaded mini-dashboard shasum differs from the one specified in the Cargo.toml"); + assert_eq!( + meta["sha1"].as_str().unwrap(), + sha1, + "Downloaded mini-dashboard shasum differs from the one specified in the Cargo.toml" + ); create_dir_all(&dashboard_dir)?; let cursor = Cursor::new(&dashboard_assets_bytes); diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index c7979210e..9f8a688bc 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use sha2::Digest; use crate::index::{Checked, Settings}; -use crate::index_controller::{IndexController, IndexStats, Stats}; -use crate::index_controller::{IndexMetadata, IndexSettings}; +use crate::index_controller::{ + DumpInfo, IndexController, IndexMetadata, IndexSettings, IndexStats, Stats, +}; use crate::option::Opt; pub mod search; @@ -68,7 +69,11 @@ impl Data { api_keys.generate_missing_api_keys(); - let inner = DataInner { index_controller, api_keys, options }; + let inner = DataInner { + index_controller, + api_keys, + options, + }; let inner = Arc::new(inner); Ok(Data { inner }) @@ -108,6 +113,14 @@ impl Data { Ok(self.index_controller.get_all_stats().await?) } + pub async fn create_dump(&self) -> anyhow::Result { + Ok(self.index_controller.create_dump().await?) + } + + pub async fn dump_status(&self, uid: String) -> anyhow::Result { + Ok(self.index_controller.dump_info(uid).await?) + } + #[inline] pub fn http_payload_size_limit(&self) -> usize { self.options.http_payload_size_limit.get_bytes() as usize diff --git a/meilisearch-http/src/dump.rs b/meilisearch-http/src/dump.rs deleted file mode 100644 index 544fffaa7..000000000 --- a/meilisearch-http/src/dump.rs +++ /dev/null @@ -1,423 +0,0 @@ -use std::fs::{create_dir_all, File}; -use std::io::prelude::*; -use std::path::{Path, PathBuf}; -use std::sync::Mutex; -use std::thread; - -use actix_web::web; -use chrono::offset::Utc; -use indexmap::IndexMap; -use log::{error, info}; -use once_cell::sync::Lazy; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use tempfile::TempDir; - -use crate::Data; -use crate::error::{Error, ResponseError}; -use crate::helpers::compression; -use crate::routes::index; -use crate::routes::setting::Settings; -use crate::routes::index::IndexResponse; - -// Mutex to share dump progress. -static DUMP_INFO: Lazy>> = Lazy::new(Mutex::default); - -#[derive(Debug, Serialize, Deserialize, Copy, Clone)] -enum DumpVersion { - V1, -} - -impl DumpVersion { - const CURRENT: Self = Self::V1; -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct DumpMetadata { - indexes: Vec, - db_version: String, - dump_version: DumpVersion, -} - -impl DumpMetadata { - /// Create a DumpMetadata with the current dump version of meilisearch. - pub fn new(indexes: Vec, db_version: String) -> Self { - DumpMetadata { - indexes, - db_version, - dump_version: DumpVersion::CURRENT, - } - } - - /// Extract DumpMetadata from `metadata.json` file present at provided `dir_path` - fn from_path(dir_path: &Path) -> Result { - let path = dir_path.join("metadata.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata = serde_json::from_reader(reader)?; - - Ok(metadata) - } - - /// Write DumpMetadata in `metadata.json` file at provided `dir_path` - fn to_path(&self, dir_path: &Path) -> Result<(), Error> { - let path = dir_path.join("metadata.json"); - let file = File::create(path)?; - - serde_json::to_writer(file, &self)?; - - Ok(()) - } -} - -/// Extract Settings from `settings.json` file present at provided `dir_path` -fn settings_from_path(dir_path: &Path) -> Result { - let path = dir_path.join("settings.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata = serde_json::from_reader(reader)?; - - Ok(metadata) -} - -/// Write Settings in `settings.json` file at provided `dir_path` -fn settings_to_path(settings: &Settings, dir_path: &Path) -> Result<(), Error> { - let path = dir_path.join("settings.json"); - let file = File::create(path)?; - - serde_json::to_writer(file, settings)?; - - Ok(()) -} - -/// Import settings and documents of a dump with version `DumpVersion::V1` in specified index. -fn import_index_v1( - data: &Data, - dumps_dir: &Path, - index_uid: &str, - document_batch_size: usize, - write_txn: &mut MainWriter, -) -> Result<(), Error> { - - // open index - let index = data - .db - .open_index(index_uid) - .ok_or(Error::index_not_found(index_uid))?; - - // index dir path in dump dir - let index_path = &dumps_dir.join(index_uid); - - // extract `settings.json` file and import content - let settings = settings_from_path(&index_path)?; - let settings = settings.to_update().map_err(|e| Error::dump_failed(format!("importing settings for index {}; {}", index_uid, e)))?; - apply_settings_update(write_txn, &index, settings)?; - - // create iterator over documents in `documents.jsonl` to make batch importation - // create iterator over documents in `documents.jsonl` to make batch importation - let documents = { - let file = File::open(&index_path.join("documents.jsonl"))?; - let reader = std::io::BufReader::new(file); - let deserializer = serde_json::Deserializer::from_reader(reader); - deserializer.into_iter::>() - }; - - // batch import document every `document_batch_size`: - // create a Vec to bufferize documents - let mut values = Vec::with_capacity(document_batch_size); - // iterate over documents - for document in documents { - // push document in buffer - values.push(document?); - // if buffer is full, create and apply a batch, and clean buffer - if values.len() == document_batch_size { - let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size)); - apply_documents_addition(write_txn, &index, batch)?; - } - } - - // apply documents remaining in the buffer - if !values.is_empty() { - apply_documents_addition(write_txn, &index, values)?; - } - - // sync index information: stats, updated_at, last_update - if let Err(e) = crate::index_update_callback_txn(index, index_uid, data, write_txn) { - return Err(Error::Internal(e)); - } - - Ok(()) -} - -/// Import dump from `dump_path` in database. -pub fn import_dump( - data: &Data, - dump_path: &Path, - document_batch_size: usize, -) -> Result<(), Error> { - info!("Importing dump from {:?}...", dump_path); - - // create a temporary directory - let tmp_dir = TempDir::new()?; - let tmp_dir_path = tmp_dir.path(); - - // extract dump in temporary directory - compression::from_tar_gz(dump_path, tmp_dir_path)?; - - // read dump metadata - let metadata = DumpMetadata::from_path(&tmp_dir_path)?; - - // choose importation function from DumpVersion of metadata - let import_index = match metadata.dump_version { - DumpVersion::V1 => import_index_v1, - }; - - // remove indexes which have same `uid` than indexes to import and create empty indexes - let existing_index_uids = data.db.indexes_uids(); - for index in metadata.indexes.iter() { - if existing_index_uids.contains(&index.uid) { - data.db.delete_index(index.uid.clone())?; - } - index::create_index_sync(&data.db, index.uid.clone(), index.name.clone(), index.primary_key.clone())?; - } - - // import each indexes content - data.db.main_write::<_, _, Error>(|mut writer| { - for index in metadata.indexes { - import_index(&data, tmp_dir_path, &index.uid, document_batch_size, &mut writer)?; - } - Ok(()) - })?; - - info!("Dump importation from {:?} succeed", dump_path); - Ok(()) -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum DumpStatus { - Done, - InProgress, - Failed, -} - -#[derive(Debug, Serialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct DumpInfo { - pub uid: String, - pub status: DumpStatus, - #[serde(skip_serializing_if = "Option::is_none", flatten)] - pub error: Option, -} - -impl DumpInfo { - pub fn new(uid: String, status: DumpStatus) -> Self { - Self { uid, status, error: None } - } - - pub fn with_error(mut self, error: ResponseError) -> Self { - self.status = DumpStatus::Failed; - self.error = Some(json!(error)); - - self - } - - pub fn dump_already_in_progress(&self) -> bool { - self.status == DumpStatus::InProgress - } - - pub fn get_current() -> Option { - DUMP_INFO.lock().unwrap().clone() - } - - pub fn set_current(&self) { - *DUMP_INFO.lock().unwrap() = Some(self.clone()); - } -} - -/// Generate uid from creation date -fn generate_uid() -> String { - Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() -} - -/// Infer dumps_dir from dump_uid -pub fn compressed_dumps_dir(dumps_dir: &Path, dump_uid: &str) -> PathBuf { - dumps_dir.join(format!("{}.dump", dump_uid)) -} - -/// Write metadata in dump -fn dump_metadata(data: &web::Data, dir_path: &Path, indexes: Vec) -> Result<(), Error> { - let (db_major, db_minor, db_patch) = data.db.version(); - let metadata = DumpMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch)); - - metadata.to_path(dir_path) -} - -/// Export settings of provided index in dump -fn dump_index_settings(data: &web::Data, reader: &MainReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> { - let settings = crate::routes::setting::get_all_sync(data, reader, index_uid)?; - - settings_to_path(&settings, dir_path) -} - -/// Export updates of provided index in dump -fn dump_index_updates(data: &web::Data, reader: &UpdateReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> { - let updates_path = dir_path.join("updates.jsonl"); - let updates = crate::routes::index::get_all_updates_status_sync(data, reader, index_uid)?; - - let file = File::create(updates_path)?; - - for update in updates { - serde_json::to_writer(&file, &update)?; - writeln!(&file)?; - } - - Ok(()) -} - -/// Export documents of provided index in dump -fn dump_index_documents(data: &web::Data, reader: &MainReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> { - let documents_path = dir_path.join("documents.jsonl"); - let file = File::create(documents_path)?; - let dump_batch_size = data.dump_batch_size; - - let mut offset = 0; - loop { - let documents = crate::routes::document::get_all_documents_sync(data, reader, index_uid, offset, dump_batch_size, None)?; - if documents.is_empty() { break; } else { offset += dump_batch_size; } - - for document in documents { - serde_json::to_writer(&file, &document)?; - writeln!(&file)?; - } - } - - Ok(()) -} - -/// Write error with a context. -fn fail_dump_process(dump_info: DumpInfo, context: &str, error: E) { - let error_message = format!("{}; {}", context, error); - - error!("Something went wrong during dump process: {}", &error_message); - dump_info.with_error(Error::dump_failed(error_message).into()).set_current(); -} - -/// Main function of dump. -fn dump_process(data: web::Data, dumps_dir: PathBuf, dump_info: DumpInfo) { - // open read transaction on Update - let update_reader = match data.db.update_read_txn() { - Ok(r) => r, - Err(e) => { - fail_dump_process(dump_info, "creating RO transaction on updates", e); - return ; - } - }; - - // open read transaction on Main - let main_reader = match data.db.main_read_txn() { - Ok(r) => r, - Err(e) => { - fail_dump_process(dump_info, "creating RO transaction on main", e); - return ; - } - }; - - // create a temporary directory - let tmp_dir = match TempDir::new() { - Ok(tmp_dir) => tmp_dir, - Err(e) => { - fail_dump_process(dump_info, "creating temporary directory", e); - return ; - } - }; - let tmp_dir_path = tmp_dir.path(); - - // fetch indexes - let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) { - Ok(indexes) => indexes, - Err(e) => { - fail_dump_process(dump_info, "listing indexes", e); - return ; - } - }; - - // create metadata - if let Err(e) = dump_metadata(&data, &tmp_dir_path, indexes.clone()) { - fail_dump_process(dump_info, "generating metadata", e); - return ; - } - - // export settings, updates and documents for each indexes - for index in indexes { - let index_path = tmp_dir_path.join(&index.uid); - - // create index sub-dircetory - if let Err(e) = create_dir_all(&index_path) { - fail_dump_process(dump_info, &format!("creating directory for index {}", &index.uid), e); - return ; - } - - // export settings - if let Err(e) = dump_index_settings(&data, &main_reader, &index_path, &index.uid) { - fail_dump_process(dump_info, &format!("generating settings for index {}", &index.uid), e); - return ; - } - - // export documents - if let Err(e) = dump_index_documents(&data, &main_reader, &index_path, &index.uid) { - fail_dump_process(dump_info, &format!("generating documents for index {}", &index.uid), e); - return ; - } - - // export updates - if let Err(e) = dump_index_updates(&data, &update_reader, &index_path, &index.uid) { - fail_dump_process(dump_info, &format!("generating updates for index {}", &index.uid), e); - return ; - } - } - - // compress dump in a file named `{dump_uid}.dump` in `dumps_dir` - if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_dumps_dir(&dumps_dir, &dump_info.uid)) { - fail_dump_process(dump_info, "compressing dump", e); - return ; - } - - // update dump info to `done` - let resume = DumpInfo::new( - dump_info.uid, - DumpStatus::Done - ); - - resume.set_current(); -} - -pub fn init_dump_process(data: &web::Data, dumps_dir: &Path) -> Result { - create_dir_all(dumps_dir).map_err(|e| Error::dump_failed(format!("creating temporary directory {}", e)))?; - - // check if a dump is already in progress - if let Some(resume) = DumpInfo::get_current() { - if resume.dump_already_in_progress() { - return Err(Error::dump_conflict()) - } - } - - // generate a new dump info - let info = DumpInfo::new( - generate_uid(), - DumpStatus::InProgress - ); - - info.set_current(); - - let data = data.clone(); - let dumps_dir = dumps_dir.to_path_buf(); - let info_cloned = info.clone(); - // run dump process in a new thread - thread::spawn(move || - dump_process(data, dumps_dir, info_cloned) - ); - - Ok(info) -} diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index 6489716ca..07bd96fb9 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -299,7 +299,7 @@ impl From for Error { JsonPayloadError::Payload(err) => { Error::BadRequest(format!("Problem while decoding the request: {}", err)) } - e => Error::Internal(format!("Unexpected Json error: {}", e)) + e => Error::Internal(format!("Unexpected Json error: {}", e)), } } } @@ -310,7 +310,7 @@ impl From for Error { QueryPayloadError::Deserialize(err) => { Error::BadRequest(format!("Invalid query parameters: {}", err)) } - e => Error::Internal(format!("Unexpected query payload error: {}", e)) + e => Error::Internal(format!("Unexpected query payload error: {}", e)), } } } diff --git a/meilisearch-http/src/helpers/authentication.rs b/meilisearch-http/src/helpers/authentication.rs index a1a0c431e..54d5488f4 100644 --- a/meilisearch-http/src/helpers/authentication.rs +++ b/meilisearch-http/src/helpers/authentication.rs @@ -1,16 +1,16 @@ use std::pin::Pin; use std::task::{Context, Poll}; +use actix_web::body::Body; use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; use actix_web::web; -use actix_web::body::Body; -use futures::ready; -use futures::future::{ok, Future, Ready}; use actix_web::ResponseError as _; +use futures::future::{ok, Future, Ready}; +use futures::ready; use pin_project::pin_project; -use crate::Data; use crate::error::{Error, ResponseError}; +use crate::Data; #[derive(Clone, Copy)] pub enum Authentication { @@ -59,19 +59,15 @@ where let data = req.app_data::>().unwrap(); if data.api_keys().master.is_none() { - return AuthenticationFuture::Authenticated(self.service.call(req)) + return AuthenticationFuture::Authenticated(self.service.call(req)); } let auth_header = match req.headers().get("X-Meili-API-Key") { Some(auth) => match auth.to_str() { Ok(auth) => auth, - Err(_) => { - return AuthenticationFuture::NoHeader(Some(req)) - } + Err(_) => return AuthenticationFuture::NoHeader(Some(req)), }, - None => { - return AuthenticationFuture::NoHeader(Some(req)) - } + None => return AuthenticationFuture::NoHeader(Some(req)), }; let authenticated = match self.acl { @@ -111,15 +107,13 @@ where { type Output = Result, actix_web::Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) ->Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); match this { - AuthProj::Authenticated(fut) => { - match ready!(fut.poll(cx)) { - Ok(resp) => Poll::Ready(Ok(resp)), - Err(e) => Poll::Ready(Err(e)), - } - } + AuthProj::Authenticated(fut) => match ready!(fut.poll(cx)) { + Ok(resp) => Poll::Ready(Ok(resp)), + Err(e) => Poll::Ready(Err(e)), + }, AuthProj::NoHeader(req) => { match req.take() { Some(req) => { @@ -135,7 +129,8 @@ where AuthProj::Refused(req) => { match req.take() { Some(req) => { - let bad_token = req.headers() + let bad_token = req + .headers() .get("X-Meili-API-Key") .map(|h| h.to_str().map(String::from).unwrap_or_default()) .unwrap_or_default(); diff --git a/meilisearch-http/src/index/dump.rs b/meilisearch-http/src/index/dump.rs new file mode 100644 index 000000000..13e6cbc02 --- /dev/null +++ b/meilisearch-http/src/index/dump.rs @@ -0,0 +1,132 @@ +use std::fs::{create_dir_all, File}; +use std::io::{BufRead, BufReader, Write}; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{bail, Context}; +use heed::RoTxn; +use indexmap::IndexMap; +use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream}; +use serde::{Deserialize, Serialize}; + +use crate::option::IndexerOpts; + +use super::{update_handler::UpdateHandler, Index, Settings, Unchecked}; + +#[derive(Serialize, Deserialize)] +struct DumpMeta { + settings: Settings, + primary_key: Option, +} + +const META_FILE_NAME: &str = "meta.json"; +const DATA_FILE_NAME: &str = "documents.jsonl"; + +impl Index { + pub fn dump(&self, path: impl AsRef) -> anyhow::Result<()> { + // acquire write txn make sure any ongoing write is finished before we start. + let txn = self.env.write_txn()?; + + self.dump_documents(&txn, &path)?; + self.dump_meta(&txn, &path)?; + + Ok(()) + } + + fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + let document_file_path = path.as_ref().join(DATA_FILE_NAME); + let mut document_file = File::create(&document_file_path)?; + + let documents = self.all_documents(txn)?; + let fields_ids_map = self.fields_ids_map(txn)?; + + // dump documents + let mut json_map = IndexMap::new(); + for document in documents { + let (_, reader) = document?; + + for (fid, bytes) in reader.iter() { + if let Some(name) = fields_ids_map.name(fid) { + json_map.insert(name, serde_json::from_slice::(bytes)?); + } + } + + serde_json::to_writer(&mut document_file, &json_map)?; + document_file.write_all(b"\n")?; + + json_map.clear(); + } + + Ok(()) + } + + fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + let meta_file_path = path.as_ref().join(META_FILE_NAME); + let mut meta_file = File::create(&meta_file_path)?; + + let settings = self.settings_txn(txn)?.into_unchecked(); + let primary_key = self.primary_key(txn)?.map(String::from); + let meta = DumpMeta { + settings, + primary_key, + }; + + serde_json::to_writer(&mut meta_file, &meta)?; + + Ok(()) + } + + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + size: usize, + indexing_options: &IndexerOpts, + ) -> anyhow::Result<()> { + let dir_name = src + .as_ref() + .file_name() + .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; + let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); + create_dir_all(&dst_dir_path)?; + + let meta_path = src.as_ref().join(META_FILE_NAME); + let mut meta_file = File::open(meta_path)?; + let DumpMeta { + settings, + primary_key, + } = serde_json::from_reader(&mut meta_file)?; + let settings = settings.check(); + let index = Self::open(&dst_dir_path, size)?; + let mut txn = index.write_txn()?; + + let handler = UpdateHandler::new(&indexing_options)?; + + index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?; + + let document_file_path = src.as_ref().join(DATA_FILE_NAME); + let reader = File::open(&document_file_path)?; + let mut reader = BufReader::new(reader); + reader.fill_buf()?; + // If the document file is empty, we don't perform the document addition, to prevent + // a primary key error to be thrown. + if !reader.buffer().is_empty() { + index.update_documents_txn( + &mut txn, + JsonStream, + IndexDocumentsMethod::UpdateDocuments, + Some(reader), + handler.update_builder(0), + primary_key.as_deref(), + )?; + } + + txn.commit()?; + + match Arc::try_unwrap(index.0) { + Ok(inner) => inner.prepare_for_closing().wait(), + Err(_) => bail!("Could not close index properly."), + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index c897fac3f..790ac58f0 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -1,16 +1,23 @@ -use std::{collections::{BTreeSet, HashSet}, marker::PhantomData}; +use std::collections::{BTreeSet, HashSet}; +use std::fs::create_dir_all; +use std::marker::PhantomData; use std::ops::Deref; +use std::path::Path; use std::sync::Arc; use anyhow::{bail, Context}; +use heed::{EnvOpenOptions, RoTxn}; use milli::obkv_to_json; use serde_json::{Map, Value}; use crate::helpers::EnvSizer; pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; -pub use updates::{Facets, Settings, Checked, Unchecked}; +use serde::{de::Deserializer, Deserialize}; +pub use updates::{Checked, Facets, Settings, Unchecked}; +mod dump; mod search; +pub mod update_handler; mod updates; pub type Document = Map; @@ -26,19 +33,36 @@ impl Deref for Index { } } +pub fn deserialize_some<'de, T, D>(deserializer: D) -> Result, D::Error> +where + T: Deserialize<'de>, + D: Deserializer<'de>, +{ + Deserialize::deserialize(deserializer).map(Some) +} + impl Index { + pub fn open(path: impl AsRef, size: usize) -> anyhow::Result { + create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, &path)?; + Ok(Index(Arc::new(index))) + } + pub fn settings(&self) -> anyhow::Result> { let txn = self.read_txn()?; + self.settings_txn(&txn) + } + pub fn settings_txn(&self, txn: &RoTxn) -> anyhow::Result> { let displayed_attributes = self .displayed_fields(&txn)? - .map(|fields| fields.into_iter().map(String::from).collect()) - .unwrap_or_else(|| vec!["*".to_string()]); + .map(|fields| fields.into_iter().map(String::from).collect()); let searchable_attributes = self .searchable_fields(&txn)? - .map(|fields| fields.into_iter().map(String::from).collect()) - .unwrap_or_else(|| vec!["*".to_string()]); + .map(|fields| fields.into_iter().map(String::from).collect()); let faceted_attributes = self .faceted_fields(&txn)? @@ -62,8 +86,8 @@ impl Index { let distinct_attribute = self.distinct_attribute(&txn)?.map(String::from); Ok(Settings { - displayed_attributes: Some(Some(displayed_attributes)), - searchable_attributes: Some(Some(searchable_attributes)), + displayed_attributes: Some(displayed_attributes), + searchable_attributes: Some(searchable_attributes), attributes_for_faceting: Some(Some(faceted_attributes)), ranking_rules: Some(Some(criteria)), stop_words: Some(Some(stop_words)), diff --git a/meilisearch-http/src/index/search.rs b/meilisearch-http/src/index/search.rs index 0ff6c1bc3..bf559eb91 100644 --- a/meilisearch-http/src/index/search.rs +++ b/meilisearch-http/src/index/search.rs @@ -90,7 +90,8 @@ impl Index { let mut documents = Vec::new(); let fields_ids_map = self.fields_ids_map(&rtxn).unwrap(); - let displayed_ids = self.displayed_fields_ids(&rtxn)? + let displayed_ids = self + .displayed_fields_ids(&rtxn)? .map(|fields| fields.into_iter().collect::>()) .unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect()); @@ -156,10 +157,8 @@ impl Index { }; let stop_words = fst::Set::default(); - let highlighter = Highlighter::new( - &stop_words, - (String::from(""), String::from("")), - ); + let highlighter = + Highlighter::new(&stop_words, (String::from(""), String::from(""))); for (_id, obkv) in self.documents(&rtxn, documents_ids)? { let document = make_document(&all_attributes, &fields_ids_map, obkv)?; @@ -384,17 +383,16 @@ mod test { #[test] fn no_formatted() { let stop_words = fst::Set::default(); - let highlighter = Highlighter::new( - &stop_words, - (String::from(""), String::from("")), - ); + let highlighter = + Highlighter::new(&stop_words, (String::from(""), String::from(""))); let mut fields = FieldsIdsMap::new(); let id = fields.insert("test").unwrap(); let mut buf = Vec::new(); let mut obkv = obkv::KvWriter::new(&mut buf); - obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap(); + obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()) + .unwrap(); obkv.finish().unwrap(); let obkv = obkv::KvReader::new(&buf); @@ -410,8 +408,9 @@ mod test { &highlighter, &matching_words, &all_formatted, - &to_highlight_ids - ).unwrap(); + &to_highlight_ids, + ) + .unwrap(); assert!(value.is_empty()); } @@ -419,17 +418,16 @@ mod test { #[test] fn formatted_no_highlight() { let stop_words = fst::Set::default(); - let highlighter = Highlighter::new( - &stop_words, - (String::from(""), String::from("")), - ); + let highlighter = + Highlighter::new(&stop_words, (String::from(""), String::from(""))); let mut fields = FieldsIdsMap::new(); let id = fields.insert("test").unwrap(); let mut buf = Vec::new(); let mut obkv = obkv::KvWriter::new(&mut buf); - obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap(); + obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()) + .unwrap(); obkv.finish().unwrap(); let obkv = obkv::KvReader::new(&buf); @@ -445,8 +443,9 @@ mod test { &highlighter, &matching_words, &all_formatted, - &to_highlight_ids - ).unwrap(); + &to_highlight_ids, + ) + .unwrap(); assert_eq!(value["test"], "hello"); } @@ -454,17 +453,16 @@ mod test { #[test] fn formatted_with_highlight() { let stop_words = fst::Set::default(); - let highlighter = Highlighter::new( - &stop_words, - (String::from(""), String::from("")), - ); + let highlighter = + Highlighter::new(&stop_words, (String::from(""), String::from(""))); let mut fields = FieldsIdsMap::new(); let id = fields.insert("test").unwrap(); let mut buf = Vec::new(); let mut obkv = obkv::KvWriter::new(&mut buf); - obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap(); + obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()) + .unwrap(); obkv.finish().unwrap(); let obkv = obkv::KvReader::new(&buf); @@ -480,8 +478,9 @@ mod test { &highlighter, &matching_words, &all_formatted, - &to_highlight_ids - ).unwrap(); + &to_highlight_ids, + ) + .unwrap(); assert_eq!(value["test"], "hello"); } diff --git a/meilisearch-http/src/index_controller/update_handler.rs b/meilisearch-http/src/index/update_handler.rs similarity index 94% rename from meilisearch-http/src/index_controller/update_handler.rs rename to meilisearch-http/src/index/update_handler.rs index d0086aadd..8a127168e 100644 --- a/meilisearch-http/src/index_controller/update_handler.rs +++ b/meilisearch-http/src/index/update_handler.rs @@ -38,7 +38,7 @@ impl UpdateHandler { }) } - fn update_builder(&self, update_id: u64) -> UpdateBuilder { + pub fn update_builder(&self, update_id: u64) -> UpdateBuilder { // We prepare the update by using the update builder. let mut update_builder = UpdateBuilder::new(update_id); if let Some(max_nb_chunks) = self.max_nb_chunks { @@ -82,7 +82,7 @@ impl UpdateHandler { ), ClearDocuments => index.clear_documents(update_builder), DeleteDocuments => index.delete_documents(content, update_builder), - Settings(settings) => index.update_settings(settings, update_builder), + Settings(settings) => index.update_settings(&settings.clone().check(), update_builder), }; match result { diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index 0d76f2ae6..b4869fa42 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -1,28 +1,39 @@ use std::collections::{BTreeSet, HashMap}; use std::io; -use std::num::NonZeroUsize; use std::marker::PhantomData; +use std::num::NonZeroUsize; use flate2::read::GzDecoder; use log::info; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; -use serde::{de::Deserializer, Deserialize, Serialize}; +use serde::{Deserialize, Serialize, Serializer}; -use super::Index; use crate::index_controller::UpdateResult; -#[derive(Clone, Default, Debug)] +use super::{deserialize_some, Index}; + +fn serialize_with_wildcard(field: &Option>>, s: S) -> Result +where + S: Serializer, +{ + let wildcard = vec!["*".to_string()]; + s.serialize_some(&field.as_ref().map(|o| o.as_ref().unwrap_or(&wildcard))) +} + +#[derive(Clone, Default, Debug, Serialize)] pub struct Checked; -#[derive(Clone, Default, Debug)] +#[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct Unchecked; #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] +#[serde(bound(serialize = "T: Serialize", deserialize = "T: Deserialize<'static>"))] pub struct Settings { #[serde( default, deserialize_with = "deserialize_some", + serialize_with = "serialize_with_wildcard", skip_serializing_if = "Option::is_none" )] pub displayed_attributes: Option>>, @@ -30,11 +41,16 @@ pub struct Settings { #[serde( default, deserialize_with = "deserialize_some", + serialize_with = "serialize_with_wildcard", skip_serializing_if = "Option::is_none" )] pub searchable_attributes: Option>>, - #[serde(default)] + #[serde( + default, + deserialize_with = "deserialize_some", + skip_serializing_if = "Option::is_none" + )] pub attributes_for_faceting: Option>>, #[serde( @@ -72,6 +88,28 @@ impl Settings { _kind: PhantomData, } } + + pub fn into_unchecked(self) -> Settings { + let Self { + displayed_attributes, + searchable_attributes, + attributes_for_faceting, + ranking_rules, + stop_words, + distinct_attribute, + .. + } = self; + + Settings { + displayed_attributes, + searchable_attributes, + attributes_for_faceting, + ranking_rules, + stop_words, + distinct_attribute, + _kind: PhantomData, + } + } } impl Settings { @@ -118,14 +156,6 @@ pub struct Facets { pub min_level_size: Option, } -fn deserialize_some<'de, T, D>(deserializer: D) -> Result, D::Error> -where - T: Deserialize<'de>, - D: Deserializer<'de>, -{ - Deserialize::deserialize(deserializer).map(Some) -} - impl Index { pub fn update_documents( &self, @@ -134,17 +164,37 @@ impl Index { content: Option, update_builder: UpdateBuilder, primary_key: Option<&str>, + ) -> anyhow::Result { + let mut txn = self.write_txn()?; + let result = self.update_documents_txn( + &mut txn, + format, + method, + content, + update_builder, + primary_key, + )?; + txn.commit()?; + Ok(result) + } + + pub fn update_documents_txn<'a, 'b>( + &'a self, + txn: &mut heed::RwTxn<'a, 'b>, + format: UpdateFormat, + method: IndexDocumentsMethod, + content: Option, + update_builder: UpdateBuilder, + primary_key: Option<&str>, ) -> anyhow::Result { info!("performing document addition"); - // We must use the write transaction of the update here. - let mut wtxn = self.write_txn()?; // Set the primary key if not set already, ignore if already set. - if let (None, Some(ref primary_key)) = (self.primary_key(&wtxn)?, primary_key) { - self.put_primary_key(&mut wtxn, primary_key)?; + if let (None, Some(ref primary_key)) = (self.primary_key(txn)?, primary_key) { + self.put_primary_key(txn, primary_key)?; } - let mut builder = update_builder.index_documents(&mut wtxn, self); + let mut builder = update_builder.index_documents(txn, self); builder.update_format(format); builder.index_documents_method(method); @@ -152,19 +202,17 @@ impl Index { |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step); let gzipped = false; - let result = match content { - Some(content) if gzipped => builder.execute(GzDecoder::new(content), indexing_callback), - Some(content) => builder.execute(content, indexing_callback), - None => builder.execute(std::io::empty(), indexing_callback), + let addition = match content { + Some(content) if gzipped => { + builder.execute(GzDecoder::new(content), indexing_callback)? + } + Some(content) => builder.execute(content, indexing_callback)?, + None => builder.execute(std::io::empty(), indexing_callback)?, }; - info!("document addition done: {:?}", result); + info!("document addition done: {:?}", addition); - result.and_then(|addition_result| { - wtxn.commit() - .and(Ok(UpdateResult::DocumentsAddition(addition_result))) - .map_err(Into::into) - }) + Ok(UpdateResult::DocumentsAddition(addition)) } pub fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result { @@ -181,14 +229,14 @@ impl Index { } } - pub fn update_settings( - &self, + pub fn update_settings_txn<'a, 'b>( + &'a self, + txn: &mut heed::RwTxn<'a, 'b>, settings: &Settings, update_builder: UpdateBuilder, ) -> anyhow::Result { // We must use the write transaction of the update here. - let mut wtxn = self.write_txn()?; - let mut builder = update_builder.settings(&mut wtxn, self); + let mut builder = update_builder.settings(txn, self); if let Some(ref names) = settings.searchable_attributes { match names { @@ -230,16 +278,22 @@ impl Index { } } - let result = builder - .execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); + builder.execute(|indexing_step, update_id| { + info!("update {}: {:?}", update_id, indexing_step) + })?; - match result { - Ok(()) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e), - } + Ok(UpdateResult::Other) + } + + pub fn update_settings( + &self, + settings: &Settings, + update_builder: UpdateBuilder, + ) -> anyhow::Result { + let mut txn = self.write_txn()?; + let result = self.update_settings_txn(&mut txn, settings, update_builder)?; + txn.commit()?; + Ok(result) } pub fn delete_documents( @@ -288,7 +342,10 @@ mod test { let checked = settings.clone().check(); assert_eq!(settings.displayed_attributes, checked.displayed_attributes); - assert_eq!(settings.searchable_attributes, checked.searchable_attributes); + assert_eq!( + settings.searchable_attributes, + checked.searchable_attributes + ); // test wildcard // test no changes diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs new file mode 100644 index 000000000..c78079de6 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -0,0 +1,156 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use async_stream::stream; +use chrono::Utc; +use futures::{lock::Mutex, stream::StreamExt}; +use log::{error, info}; +use tokio::sync::{mpsc, oneshot, RwLock}; +use update_actor::UpdateActorHandle; +use uuid_resolver::UuidResolverHandle; + +use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus, DumpTask}; +use crate::index_controller::{update_actor, uuid_resolver}; + +pub const CONCURRENT_DUMP_MSG: usize = 10; + +pub struct DumpActor { + inbox: Option>, + uuid_resolver: UuidResolver, + update: Update, + dump_path: PathBuf, + lock: Arc>, + dump_infos: Arc>>, + update_db_size: usize, + index_db_size: usize, +} + +/// Generate uid from creation date +fn generate_uid() -> String { + Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() +} + +impl DumpActor +where + UuidResolver: UuidResolverHandle + Send + Sync + Clone + 'static, + Update: UpdateActorHandle + Send + Sync + Clone + 'static, +{ + pub fn new( + inbox: mpsc::Receiver, + uuid_resolver: UuidResolver, + update: Update, + dump_path: impl AsRef, + index_db_size: usize, + update_db_size: usize, + ) -> Self { + let dump_infos = Arc::new(RwLock::new(HashMap::new())); + let lock = Arc::new(Mutex::new(())); + Self { + inbox: Some(inbox), + uuid_resolver, + update, + dump_path: dump_path.as_ref().into(), + dump_infos, + lock, + index_db_size, + update_db_size, + } + } + + pub async fn run(mut self) { + info!("Started dump actor."); + + let mut inbox = self + .inbox + .take() + .expect("Dump Actor must have a inbox at this point."); + + let stream = stream! { + loop { + match inbox.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + + stream + .for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg)) + .await; + + error!("Dump actor stopped."); + } + + async fn handle_message(&self, msg: DumpMsg) { + use DumpMsg::*; + + match msg { + CreateDump { ret } => { + let _ = self.handle_create_dump(ret).await; + } + DumpInfo { ret, uid } => { + let _ = ret.send(self.handle_dump_info(uid).await); + } + } + } + + async fn handle_create_dump(&self, ret: oneshot::Sender>) { + let uid = generate_uid(); + let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); + + let _lock = match self.lock.try_lock() { + Some(lock) => lock, + None => { + ret.send(Err(DumpError::DumpAlreadyRunning)) + .expect("Dump actor is dead"); + return; + } + }; + + self.dump_infos + .write() + .await + .insert(uid.clone(), info.clone()); + + ret.send(Ok(info)).expect("Dump actor is dead"); + + let task = DumpTask { + path: self.dump_path.clone(), + uuid_resolver: self.uuid_resolver.clone(), + update_handle: self.update.clone(), + uid: uid.clone(), + update_db_size: self.update_db_size, + index_db_size: self.index_db_size, + }; + + let task_result = tokio::task::spawn(task.run()).await; + + let mut dump_infos = self.dump_infos.write().await; + let dump_infos = dump_infos + .get_mut(&uid) + .expect("dump entry deleted while lock was acquired"); + + match task_result { + Ok(Ok(())) => { + dump_infos.done(); + info!("Dump succeed"); + } + Ok(Err(e)) => { + dump_infos.with_error(e.to_string()); + error!("Dump failed: {}", e); + } + Err(_) => { + dump_infos.with_error("Unexpected error while performing dump.".to_string()); + error!("Dump panicked. Dump status set to failed"); + } + }; + } + + async fn handle_dump_info(&self, uid: String) -> DumpResult { + match self.dump_infos.read().await.get(&uid) { + Some(info) => Ok(info.clone()), + _ => Err(DumpError::DumpDoesNotExist(uid)), + } + } +} diff --git a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs new file mode 100644 index 000000000..ab91aeae6 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs @@ -0,0 +1,52 @@ +use std::path::Path; + +use actix_web::web::Bytes; +use tokio::sync::{mpsc, oneshot}; + +use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult}; + +#[derive(Clone)] +pub struct DumpActorHandleImpl { + sender: mpsc::Sender, +} + +#[async_trait::async_trait] +impl DumpActorHandle for DumpActorHandleImpl { + async fn create_dump(&self) -> DumpResult { + let (ret, receiver) = oneshot::channel(); + let msg = DumpMsg::CreateDump { ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("IndexActor has been killed") + } + + async fn dump_info(&self, uid: String) -> DumpResult { + let (ret, receiver) = oneshot::channel(); + let msg = DumpMsg::DumpInfo { ret, uid }; + let _ = self.sender.send(msg).await; + receiver.await.expect("IndexActor has been killed") + } +} + +impl DumpActorHandleImpl { + pub fn new( + path: impl AsRef, + uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl, + update: crate::index_controller::update_actor::UpdateActorHandleImpl, + index_db_size: usize, + update_db_size: usize, + ) -> anyhow::Result { + let (sender, receiver) = mpsc::channel(10); + let actor = DumpActor::new( + receiver, + uuid_resolver, + update, + path, + index_db_size, + update_db_size, + ); + + tokio::task::spawn(actor.run()); + + Ok(Self { sender }) + } +} diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs new file mode 100644 index 000000000..ae6adc7cf --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs @@ -0,0 +1,2 @@ +pub mod v1; +pub mod v2; diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs new file mode 100644 index 000000000..decd67f87 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs @@ -0,0 +1,183 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::fs::{create_dir_all, File}; +use std::io::BufRead; +use std::marker::PhantomData; +use std::path::Path; +use std::sync::Arc; + +use heed::EnvOpenOptions; +use log::{error, info, warn}; +use milli::update::{IndexDocumentsMethod, UpdateFormat}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata}; +use crate::{ + index::{deserialize_some, update_handler::UpdateHandler, Index, Unchecked}, + option::IndexerOpts, +}; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MetadataV1 { + db_version: String, + indexes: Vec, +} + +impl MetadataV1 { + pub fn load_dump( + self, + src: impl AsRef, + dst: impl AsRef, + size: usize, + indexer_options: &IndexerOpts, + ) -> anyhow::Result<()> { + info!( + "Loading dump, dump database version: {}, dump version: V1", + self.db_version + ); + + let uuid_store = HeedUuidStore::new(&dst)?; + for index in self.indexes { + let uuid = Uuid::new_v4(); + uuid_store.insert(index.uid.clone(), uuid)?; + let src = src.as_ref().join(index.uid); + load_index( + &src, + &dst, + uuid, + index.meta.primary_key.as_deref(), + size, + indexer_options, + )?; + } + + Ok(()) + } +} + +// These are the settings used in legacy meilisearch (>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub distinct_attribute: Option>, + #[serde(default, deserialize_with = "deserialize_some")] + pub searchable_attributes: Option>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub displayed_attributes: Option>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub stop_words: Option>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub synonyms: Option>>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub attributes_for_faceting: Option>>, +} + +fn load_index( + src: impl AsRef, + dst: impl AsRef, + uuid: Uuid, + primary_key: Option<&str>, + size: usize, + indexer_options: &IndexerOpts, +) -> anyhow::Result<()> { + let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid)); + + create_dir_all(&index_path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, index_path)?; + let index = Index(Arc::new(index)); + + // extract `settings.json` file and import content + let settings = import_settings(&src)?; + let settings: index_controller::Settings = settings.into(); + + let mut txn = index.write_txn()?; + + let handler = UpdateHandler::new(&indexer_options)?; + + index.update_settings_txn(&mut txn, &settings.check(), handler.update_builder(0))?; + + let file = File::open(&src.as_ref().join("documents.jsonl"))?; + let mut reader = std::io::BufReader::new(file); + reader.fill_buf()?; + if !reader.buffer().is_empty() { + index.update_documents_txn( + &mut txn, + UpdateFormat::JsonStream, + IndexDocumentsMethod::ReplaceDocuments, + Some(reader), + handler.update_builder(0), + primary_key, + )?; + } + + txn.commit()?; + + // Finaly, we extract the original milli::Index and close it + Arc::try_unwrap(index.0) + .map_err(|_e| "Couldn't close the index properly") + .unwrap() + .prepare_for_closing() + .wait(); + + // Updates are ignored in dumps V1. + + Ok(()) +} + +/// we need to **always** be able to convert the old settings to the settings currently being used +impl From for index_controller::Settings { + fn from(settings: Settings) -> Self { + if settings.synonyms.flatten().is_some() { + error!("`synonyms` are not yet implemented and thus will be ignored"); + } + Self { + distinct_attribute: settings.distinct_attribute, + // we need to convert the old `Vec` into a `BTreeSet` + displayed_attributes: settings.displayed_attributes.map(|o| o.map(|vec| vec.into_iter().collect())), + searchable_attributes: settings.searchable_attributes, + // we previously had a `Vec` but now we have a `HashMap` + // representing the name of the faceted field + the type of the field. Since the type + // was not known in the V1 of the dump we are just going to assume everything is a + // String + attributes_for_faceting: settings.attributes_for_faceting.map(|o| o.map(|vec| vec.into_iter().map(|key| (key, String::from("string"))).collect())), + // we need to convert the old `Vec` into a `BTreeSet` + ranking_rules: settings.ranking_rules.map(|o| o.map(|vec| vec.into_iter().filter_map(|criterion| { + match criterion.as_str() { + "words" | "typo" | "proximity" | "attribute" => Some(criterion), + s if s.starts_with("asc") || s.starts_with("desc") => Some(criterion), + "wordsPosition" => { + warn!("The criteria `words` and `wordsPosition` have been merged into a single criterion `words` so `wordsPositon` will be ignored"); + Some(String::from("words")) + } + "exactness" => { + error!("The criterion `{}` is not implemented currently and thus will be ignored", criterion); + None + } + s => { + error!("Unknown criterion found in the dump: `{}`, it will be ignored", s); + None + } + } + }).collect())), + // we need to convert the old `Vec` into a `BTreeSet` + stop_words: settings.stop_words.map(|o| o.map(|vec| vec.into_iter().collect())), + _kind: PhantomData, + } + } +} + +/// Extract Settings from `settings.json` file present at provided `dir_path` +fn import_settings(dir_path: impl AsRef) -> anyhow::Result { + let path = dir_path.as_ref().join("settings.json"); + let file = File::open(path)?; + let reader = std::io::BufReader::new(file); + let metadata = serde_json::from_reader(reader)?; + + Ok(metadata) +} diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs new file mode 100644 index 000000000..eddd8a3b7 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs @@ -0,0 +1,59 @@ +use std::path::Path; + +use chrono::{DateTime, Utc}; +use log::info; +use serde::{Deserialize, Serialize}; + +use crate::index::Index; +use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore}; +use crate::option::IndexerOpts; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MetadataV2 { + db_version: String, + index_db_size: usize, + update_db_size: usize, + dump_date: DateTime, +} + +impl MetadataV2 { + pub fn new(index_db_size: usize, update_db_size: usize) -> Self { + Self { + db_version: env!("CARGO_PKG_VERSION").to_string(), + index_db_size, + update_db_size, + dump_date: Utc::now(), + } + } + + pub fn load_dump( + self, + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + update_db_size: usize, + indexing_options: &IndexerOpts, + ) -> anyhow::Result<()> { + info!( + "Loading dump from {}, dump database version: {}, dump version: V2", + self.dump_date, self.db_version + ); + + info!("Loading index database."); + HeedUuidStore::load_dump(src.as_ref(), &dst)?; + + info!("Loading updates."); + UpdateStore::load_dump(&src, &dst, update_db_size)?; + + info!("Loading indexes."); + let indexes_path = src.as_ref().join("indexes"); + let indexes = indexes_path.read_dir()?; + for index in indexes { + let index = index?; + Index::load_dump(&index.path(), &dst, index_db_size, indexing_options)?; + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/dump_actor/message.rs b/meilisearch-http/src/index_controller/dump_actor/message.rs new file mode 100644 index 000000000..dff9f5954 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/message.rs @@ -0,0 +1,13 @@ +use tokio::sync::oneshot; + +use super::{DumpInfo, DumpResult}; + +pub enum DumpMsg { + CreateDump { + ret: oneshot::Sender>, + }, + DumpInfo { + uid: String, + ret: oneshot::Sender>, + }, +} diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs new file mode 100644 index 000000000..66f081e87 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -0,0 +1,214 @@ +use std::fs::File; +use std::path::{Path, PathBuf}; + +use anyhow::Context; +use chrono::{DateTime, Utc}; +use log::{error, info, warn}; +#[cfg(test)] +use mockall::automock; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tokio::fs::create_dir_all; + +use loaders::v1::MetadataV1; +use loaders::v2::MetadataV2; + +pub use actor::DumpActor; +pub use handle_impl::*; +pub use message::DumpMsg; + +use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle}; +use crate::{helpers::compression, option::IndexerOpts}; + +mod actor; +mod handle_impl; +mod loaders; +mod message; + +const META_FILE_NAME: &str = "metadata.json"; + +pub type DumpResult = std::result::Result; + +#[derive(Error, Debug)] +pub enum DumpError { + #[error("error with index: {0}")] + Error(#[from] anyhow::Error), + #[error("Heed error: {0}")] + HeedError(#[from] heed::Error), + #[error("dump already running")] + DumpAlreadyRunning, + #[error("dump `{0}` does not exist")] + DumpDoesNotExist(String), +} + +#[async_trait::async_trait] +#[cfg_attr(test, automock)] +pub trait DumpActorHandle { + /// Start the creation of a dump + /// Implementation: [handle_impl::DumpActorHandleImpl::create_dump] + async fn create_dump(&self) -> DumpResult; + + /// Return the status of an already created dump + /// Implementation: [handle_impl::DumpActorHandleImpl::dump_status] + async fn dump_info(&self, uid: String) -> DumpResult; +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "dumpVersion")] +pub enum Metadata { + V1(MetadataV1), + V2(MetadataV2), +} + +impl Metadata { + pub fn new_v2(index_db_size: usize, update_db_size: usize) -> Self { + let meta = MetadataV2::new(index_db_size, update_db_size); + Self::V2(meta) + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[serde(rename_all = "snake_case")] +pub enum DumpStatus { + Done, + InProgress, + Failed, +} + +#[derive(Debug, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct DumpInfo { + pub uid: String, + pub status: DumpStatus, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + started_at: DateTime, + #[serde(skip_serializing_if = "Option::is_none")] + finished_at: Option>, +} + +impl DumpInfo { + pub fn new(uid: String, status: DumpStatus) -> Self { + Self { + uid, + status, + error: None, + started_at: Utc::now(), + finished_at: None, + } + } + + pub fn with_error(&mut self, error: String) { + self.status = DumpStatus::Failed; + self.finished_at = Some(Utc::now()); + self.error = Some(error); + } + + pub fn done(&mut self) { + self.finished_at = Some(Utc::now()); + self.status = DumpStatus::Done; + } + + pub fn dump_already_in_progress(&self) -> bool { + self.status == DumpStatus::InProgress + } +} + +pub fn load_dump( + dst_path: impl AsRef, + src_path: impl AsRef, + index_db_size: usize, + update_db_size: usize, + indexer_opts: &IndexerOpts, +) -> anyhow::Result<()> { + let tmp_src = tempfile::tempdir_in(".")?; + let tmp_src_path = tmp_src.path(); + + compression::from_tar_gz(&src_path, tmp_src_path)?; + + let meta_path = tmp_src_path.join(META_FILE_NAME); + let mut meta_file = File::open(&meta_path)?; + let meta: Metadata = serde_json::from_reader(&mut meta_file)?; + + let dst_dir = dst_path + .as_ref() + .parent() + .with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?; + + let tmp_dst = tempfile::tempdir_in(dst_dir)?; + + match meta { + Metadata::V1(meta) => { + meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)? + } + Metadata::V2(meta) => meta.load_dump( + &tmp_src_path, + tmp_dst.path(), + index_db_size, + update_db_size, + indexer_opts, + )?, + } + // Persist and atomically rename the db + let persisted_dump = tmp_dst.into_path(); + if dst_path.as_ref().exists() { + warn!("Overwriting database at {}", dst_path.as_ref().display()); + std::fs::remove_dir_all(&dst_path)?; + } + + std::fs::rename(&persisted_dump, &dst_path)?; + + Ok(()) +} + +struct DumpTask { + path: PathBuf, + uuid_resolver: U, + update_handle: P, + uid: String, + update_db_size: usize, + index_db_size: usize, +} + +impl DumpTask +where + U: UuidResolverHandle + Send + Sync + Clone + 'static, + P: UpdateActorHandle + Send + Sync + Clone + 'static, +{ + async fn run(self) -> anyhow::Result<()> { + info!("Performing dump."); + + create_dir_all(&self.path).await?; + + let path_clone = self.path.clone(); + let temp_dump_dir = + tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let meta = Metadata::new_v2(self.index_db_size, self.update_db_size); + let meta_path = temp_dump_path.join(META_FILE_NAME); + let mut meta_file = File::create(&meta_path)?; + serde_json::to_writer(&mut meta_file, &meta)?; + + let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?; + + self.update_handle + .dump(uuids, temp_dump_path.clone()) + .await?; + + let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { + let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; + compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?; + + let dump_path = self.path.join(self.uid).with_extension("dump"); + temp_dump_file.persist(&dump_path)?; + + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 1f1cf146b..31e2a58d4 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -6,14 +6,15 @@ use async_stream::stream; use futures::stream::StreamExt; use heed::CompactionOption; use log::debug; -use tokio::sync::mpsc; use tokio::task::spawn_blocking; +use tokio::{fs, sync::mpsc}; use uuid::Uuid; -use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; +use crate::index::{ + update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings, +}; use crate::index_controller::{ - get_arc_ownership_blocking, update_handler::UpdateHandler, Failed, IndexStats, Processed, - Processing, + get_arc_ownership_blocking, Failed, IndexStats, Processed, Processing, }; use crate::option::IndexerOpts; @@ -30,12 +31,19 @@ pub struct IndexActor { impl IndexActor { pub fn new(receiver: mpsc::Receiver, store: S) -> IndexResult { let options = IndexerOpts::default(); - let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; + let update_handler = UpdateHandler::new(&options)?; let update_handler = Arc::new(update_handler); let receiver = Some(receiver); - Ok(Self { receiver, update_handler, store }) + Ok(Self { + receiver, + update_handler, + store, + }) } + /// `run` poll the write_receiver and read_receiver concurrently, but while messages send + /// through the read channel are processed concurrently, the messages sent through the write + /// channel are processed one at a time. pub async fn run(mut self) { let mut receiver = self .receiver @@ -119,6 +127,9 @@ impl IndexActor { Snapshot { uuid, path, ret } => { let _ = ret.send(self.handle_snapshot(uuid, path).await); } + Dump { uuid, path, ret } => { + let _ = ret.send(self.handle_dump(uuid, path).await); + } GetStats { uuid, ret } => { let _ = ret.send(self.handle_get_stats(uuid).await); } @@ -140,9 +151,7 @@ impl IndexActor { primary_key: Option, ) -> IndexResult { let index = self.store.create(uuid, primary_key).await?; - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; + let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; Ok(meta) } @@ -159,9 +168,9 @@ impl IndexActor { None => self.store.create(uuid, None).await?, }; - spawn_blocking(move || update_handler.handle_update(meta, data, index)) - .await - .map_err(|e| IndexError::Error(e.into())) + let result = + spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?; + Ok(result) } async fn handle_settings(&self, uuid: Uuid) -> IndexResult> { @@ -170,9 +179,8 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || index.settings().map_err(IndexError::Error)) - .await - .map_err(|e| IndexError::Error(e.into()))? + let result = spawn_blocking(move || index.settings()).await??; + Ok(result) } async fn handle_fetch_documents( @@ -187,13 +195,11 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_documents(offset, limit, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? + let result = + spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)) + .await??; + + Ok(result) } async fn handle_fetch_document( @@ -207,13 +213,12 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_document(doc_id, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? + + let result = + spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)) + .await??; + + Ok(result) } async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> { @@ -236,9 +241,7 @@ impl IndexActor { async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult { match self.store.get(uuid).await? { Some(index) => { - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; + let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; Ok(meta) } None => Err(IndexError::UnexistingIndex), @@ -256,7 +259,7 @@ impl IndexActor { .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || match index_settings.primary_key { + let result = spawn_blocking(move || match index_settings.primary_key { Some(ref primary_key) => { let mut txn = index.write_txn()?; if index.primary_key(&txn)?.is_some() { @@ -272,23 +275,22 @@ impl IndexActor { Ok(meta) } }) - .await - .map_err(|e| IndexError::Error(e.into()))? + .await??; + + Ok(result) } async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> { use tokio::fs::create_dir_all; path.push("indexes"); - create_dir_all(&path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + create_dir_all(&path).await?; if let Some(index) = self.store.get(uuid).await? { let mut index_path = path.join(format!("index-{}", uuid)); - create_dir_all(&index_path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + + create_dir_all(&index_path).await?; + index_path.push("data.mdb"); spawn_blocking(move || -> anyhow::Result<()> { // Get write txn to wait for ongoing write transaction before snapshot. @@ -298,14 +300,29 @@ impl IndexActor { .copy_to_path(index_path, CompactionOption::Enabled)?; Ok(()) }) - .await - .map_err(|e| IndexError::Error(e.into()))? - .map_err(IndexError::Error)?; + .await??; } Ok(()) } + /// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the + /// documents and all the settings. + async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + + let path = path.join(format!("indexes/index-{}/", uuid)); + fs::create_dir_all(&path).await?; + + tokio::task::spawn_blocking(move || index.dump(path)).await??; + + Ok(()) + } + async fn handle_get_stats(&self, uuid: Uuid) -> IndexResult { let index = self .store @@ -323,7 +340,6 @@ impl IndexActor { fields_distribution: index.fields_distribution(&rtxn)?, }) }) - .await - .map_err(|e| IndexError::Error(e.into()))? + .await? } } diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index 4569ea020..6bf83c647 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -3,7 +3,10 @@ use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use crate::{index::Checked, index_controller::{IndexSettings, IndexStats, Processing}}; +use crate::{ + index::Checked, + index_controller::{IndexSettings, IndexStats, Processing}, +}; use crate::{ index::{Document, SearchQuery, SearchResult, Settings}, index_controller::{Failed, Processed}, @@ -136,6 +139,13 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } + async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Dump { uuid, path, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetStats { uuid, ret }; diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index 4e2824871..377b2c333 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use tokio::sync::oneshot; use uuid::Uuid; -use crate::index::{Document, SearchQuery, SearchResult, Settings, Checked}; +use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index_controller::{Failed, IndexStats, Processed, Processing}; use super::{IndexMeta, IndexResult, IndexSettings}; @@ -60,6 +60,11 @@ pub enum IndexMsg { path: PathBuf, ret: oneshot::Sender>, }, + Dump { + uuid: Uuid, + path: PathBuf, + ret: oneshot::Sender>, + }, GetStats { uuid: Uuid, ret: oneshot::Sender>, diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index f7f230349..1ddc0199e 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -15,7 +15,7 @@ use message::IndexMsg; use store::{IndexStore, MapIndexStore}; use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{Failed, Processed, Processing, IndexStats}; +use crate::index_controller::{Failed, IndexStats, Processed, Processing}; use super::IndexSettings; @@ -31,7 +31,7 @@ pub type IndexResult = std::result::Result; pub struct IndexMeta { created_at: DateTime, pub updated_at: DateTime, - primary_key: Option, + pub primary_key: Option, } impl IndexMeta { @@ -44,24 +44,45 @@ impl IndexMeta { let created_at = index.created_at(&txn)?; let updated_at = index.updated_at(&txn)?; let primary_key = index.primary_key(&txn)?.map(String::from); - Ok(Self { created_at, updated_at, primary_key }) + Ok(Self { + created_at, + updated_at, + primary_key, + }) } } #[derive(Error, Debug)] pub enum IndexError { - #[error("error with index: {0}")] - Error(#[from] anyhow::Error), #[error("index already exists")] IndexAlreadyExists, #[error("Index doesn't exists")] UnexistingIndex, - #[error("Heed error: {0}")] - HeedError(#[from] heed::Error), #[error("Existing primary key")] ExistingPrimaryKey, + #[error("Internal Index Error: {0}")] + Internal(String), } +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for IndexError { + fn from(other: $other) -> Self { + Self::Internal(other.to_string()) + } + } + )* + } +} + +internal_error!( + anyhow::Error, + heed::Error, + tokio::task::JoinError, + std::io::Error +); + #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait IndexActorHandle { @@ -97,6 +118,7 @@ pub trait IndexActorHandle { index_settings: IndexSettings, ) -> IndexResult; async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; + async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; async fn get_index_stats(&self, uuid: Uuid) -> IndexResult; } @@ -177,6 +199,10 @@ mod test { self.as_ref().snapshot(uuid, path).await } + async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + self.as_ref().dump(uuid, path).await + } + async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { self.as_ref().get_index_stats(uuid).await } diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 44f076f2f..11791be48 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; -use heed::EnvOpenOptions; use tokio::fs; use tokio::sync::RwLock; use tokio::task::spawn_blocking; @@ -48,7 +47,7 @@ impl IndexStore for MapIndexStore { let index_size = self.index_size; let index = spawn_blocking(move || -> IndexResult { - let index = open_index(&path, index_size)?; + let index = Index::open(path, index_size)?; if let Some(primary_key) = primary_key { let mut txn = index.write_txn()?; index.put_primary_key(&mut txn, &primary_key)?; @@ -56,8 +55,7 @@ impl IndexStore for MapIndexStore { } Ok(index) }) - .await - .map_err(|e| IndexError::Error(e.into()))??; + .await??; self.index_store.write().await.insert(uuid, index.clone()); @@ -77,9 +75,7 @@ impl IndexStore for MapIndexStore { } let index_size = self.index_size; - let index = spawn_blocking(move || open_index(path, index_size)) - .await - .map_err(|e| IndexError::Error(e.into()))??; + let index = spawn_blocking(move || Index::open(path, index_size)).await??; self.index_store.write().await.insert(uuid, index.clone()); Ok(Some(index)) } @@ -88,18 +84,8 @@ impl IndexStore for MapIndexStore { async fn delete(&self, uuid: Uuid) -> IndexResult> { let db_path = self.path.join(format!("index-{}", uuid)); - fs::remove_dir_all(db_path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + fs::remove_dir_all(db_path).await?; let index = self.index_store.write().await.remove(&uuid); Ok(index) } } - -fn open_index(path: impl AsRef, size: usize) -> IndexResult { - std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, &path).map_err(IndexError::Error)?; - Ok(Index(Arc::new(index))) -} diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index f1da36740..f562d2185 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -14,19 +14,23 @@ use tokio::sync::mpsc; use tokio::time::sleep; use uuid::Uuid; -pub use updates::*; +use dump_actor::DumpActorHandle; +pub use dump_actor::{DumpInfo, DumpStatus}; use index_actor::IndexActorHandle; -use snapshot::{SnapshotService, load_snapshot}; +use snapshot::{load_snapshot, SnapshotService}; use update_actor::UpdateActorHandle; -use uuid_resolver::{UuidError, UuidResolverHandle}; +pub use updates::*; +use uuid_resolver::{UuidResolverError, UuidResolverHandle}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::option::Opt; +use self::dump_actor::load_dump; + +mod dump_actor; mod index_actor; mod snapshot; mod update_actor; -mod update_handler; mod updates; mod uuid_resolver; @@ -60,10 +64,12 @@ pub struct IndexStats { pub fields_distribution: FieldsDistribution, } +#[derive(Clone)] pub struct IndexController { uuid_resolver: uuid_resolver::UuidResolverHandleImpl, index_handle: index_actor::IndexActorHandleImpl, update_handle: update_actor::UpdateActorHandleImpl, + dump_handle: dump_actor::DumpActorHandleImpl, } #[derive(Serialize)] @@ -87,6 +93,14 @@ impl IndexController { options.ignore_snapshot_if_db_exists, options.ignore_missing_snapshot, )?; + } else if let Some(ref src_path) = options.import_dump { + load_dump( + &options.db_path, + src_path, + options.max_mdb_size.get_bytes() as usize, + options.max_udb_size.get_bytes() as usize, + &options.indexer_options, + )?; } std::fs::create_dir_all(&path)?; @@ -98,6 +112,13 @@ impl IndexController { &path, update_store_size, )?; + let dump_handle = dump_actor::DumpActorHandleImpl::new( + &options.dumps_dir, + uuid_resolver.clone(), + update_handle.clone(), + options.max_mdb_size.get_bytes() as usize, + options.max_udb_size.get_bytes() as usize, + )?; if options.schedule_snapshot { let snapshot_service = SnapshotService::new( @@ -119,6 +140,7 @@ impl IndexController { uuid_resolver, index_handle, update_handle, + dump_handle, }) } @@ -143,11 +165,6 @@ impl IndexController { // registered and the update_actor that waits for the the payload to be sent to it. tokio::task::spawn_local(async move { payload - .map(|bytes| { - bytes.map_err(|e| { - Box::new(e) as Box - }) - }) .for_each(|r| async { let _ = sender.send(r).await; }) @@ -160,7 +177,7 @@ impl IndexController { match self.uuid_resolver.get(uid).await { Ok(uuid) => Ok(perform_update(uuid).await?), - Err(UuidError::UnexistingIndex(name)) => { + Err(UuidResolverError::UnexistingIndex(name)) => { let uuid = Uuid::new_v4(); let status = perform_update(uuid).await?; // ignore if index creation fails now, since it may already have been created @@ -206,7 +223,7 @@ impl IndexController { create: bool, ) -> anyhow::Result { let perform_udpate = |uuid| async move { - let meta = UpdateMeta::Settings(settings); + let meta = UpdateMeta::Settings(settings.into_unchecked()); // Nothing so send, drop the sender right away, as not to block the update actor. let (_, receiver) = mpsc::channel(1); self.update_handle.update(meta, receiver, uuid).await @@ -214,7 +231,7 @@ impl IndexController { match self.uuid_resolver.get(uid).await { Ok(uuid) => Ok(perform_udpate(uuid).await?), - Err(UuidError::UnexistingIndex(name)) if create => { + Err(UuidResolverError::UnexistingIndex(name)) if create => { let uuid = Uuid::new_v4(); let status = perform_udpate(uuid).await?; // ignore if index creation fails now, since it may already have been created @@ -393,6 +410,14 @@ impl IndexController { indexes, }) } + + pub async fn create_dump(&self) -> anyhow::Result { + Ok(self.dump_handle.create_dump().await?) + } + + pub async fn dump_info(&self, uid: String) -> anyhow::Result { + Ok(self.dump_handle.dump_info(uid).await?) + } } pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index 2a456eb26..daef7d582 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -144,7 +144,7 @@ mod test { use crate::index_controller::update_actor::{ MockUpdateActorHandle, UpdateActorHandleImpl, UpdateError, }; - use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError}; + use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidResolverError}; #[actix_rt::test] async fn test_normal() { @@ -193,7 +193,7 @@ mod test { .expect_snapshot() .times(1) // abitrary error - .returning(|_| Box::pin(err(UuidError::NameAlreadyExist))); + .returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); let update_handle = MockUpdateActorHandle::new(); @@ -248,7 +248,7 @@ mod test { // we expect the funtion to be called between 2 and 3 time in the given interval. .times(2..4) // abitrary error, to short-circuit the function - .returning(move |_| Box::pin(err(UuidError::NameAlreadyExist))); + .returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); let update_handle = MockUpdateActorHandle::new(); diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index b26ba4b8e..7779f2556 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc; use uuid::Uuid; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; -use crate::index_controller::index_actor::{IndexActorHandle}; +use crate::index_controller::index_actor::IndexActorHandle; use crate::index_controller::{UpdateMeta, UpdateStatus}; pub struct UpdateActor { @@ -42,7 +42,12 @@ where let store = UpdateStore::open(options, &path, index_handle.clone())?; std::fs::create_dir_all(path.join("update_files"))?; assert!(path.exists()); - Ok(Self { path, store, inbox, index_handle }) + Ok(Self { + path, + store, + inbox, + index_handle, + }) } pub async fn run(mut self) { @@ -72,6 +77,9 @@ where Some(Snapshot { uuids, path, ret }) => { let _ = ret.send(self.handle_snapshot(uuids, path).await); } + Some(Dump { uuids, path, ret }) => { + let _ = ret.send(self.handle_dump(uuids, path).await); + } Some(GetInfo { ret }) => { let _ = ret.send(self.handle_get_info().await); } @@ -86,11 +94,8 @@ where meta: UpdateMeta, mut payload: mpsc::Receiver>, ) -> Result { - let file_path = match meta { - UpdateMeta::DocumentsAddition { .. } - | UpdateMeta::DeleteDocuments => { - + UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => { let update_file_id = uuid::Uuid::new_v4(); let path = self .path @@ -100,39 +105,26 @@ where .write(true) .create(true) .open(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; + .await?; let mut file_len = 0; while let Some(bytes) = payload.recv().await { - match bytes { - Ok(bytes) => { - file_len += bytes.as_ref().len(); - file.write_all(bytes.as_ref()) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - } - Err(e) => { - return Err(UpdateError::Error(e)); - } - } + let bytes = bytes?; + file_len += bytes.as_ref().len(); + file.write_all(bytes.as_ref()).await?; } if file_len != 0 { - file.flush() - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; + file.flush().await?; let file = file.into_std().await; - Some((file, path)) + Some((file, update_file_id)) } else { // empty update, delete the empty file. - fs::remove_file(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; + fs::remove_file(&path).await?; None } } - _ => None + _ => None, }; let update_store = self.store.clone(); @@ -141,52 +133,45 @@ where use std::io::{copy, sink, BufReader, Seek}; // If the payload is empty, ignore the check. - let path = if let Some((mut file, path)) = file_path { + let update_uuid = if let Some((mut file, uuid)) = file_path { // set the file back to the beginning - file.seek(SeekFrom::Start(0)).map_err(|e| UpdateError::Error(Box::new(e)))?; + file.seek(SeekFrom::Start(0))?; // Check that the json payload is valid: let reader = BufReader::new(&mut file); let mut checker = JsonChecker::new(reader); if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() { // The json file is invalid, we use Serde to get a nice error message: - file.seek(SeekFrom::Start(0)) - .map_err(|e| UpdateError::Error(Box::new(e)))?; - let _: serde_json::Value = serde_json::from_reader(file) - .map_err(|e| UpdateError::Error(Box::new(e)))?; + file.seek(SeekFrom::Start(0))?; + let _: serde_json::Value = serde_json::from_reader(file)?; } - Some(path) + Some(uuid) } else { None }; // The payload is valid, we can register it to the update store. - update_store - .register_update(meta, path, uuid) - .map(UpdateStatus::Enqueued) - .map_err(|e| UpdateError::Error(Box::new(e))) + let status = update_store + .register_update(meta, update_uuid, uuid) + .map(UpdateStatus::Enqueued)?; + Ok(status) }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? + .await? } async fn handle_list_updates(&self, uuid: Uuid) -> Result> { let update_store = self.store.clone(); tokio::task::spawn_blocking(move || { - let result = update_store - .list(uuid) - .map_err(|e| UpdateError::Error(e.into()))?; + let result = update_store.list(uuid)?; Ok(result) }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? + .await? } async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { let store = self.store.clone(); let result = store - .meta(uuid, id) - .map_err(|e| UpdateError::Error(Box::new(e)))? + .meta(uuid, id)? .ok_or(UpdateError::UnexistingUpdate(id))?; Ok(result) } @@ -194,10 +179,7 @@ where async fn handle_delete(&self, uuid: Uuid) -> Result<()> { let store = self.store.clone(); - tokio::task::spawn_blocking(move || store.delete_all(uuid)) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??; Ok(()) } @@ -206,10 +188,21 @@ where let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); - tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) + .await??; + + Ok(()) + } + + async fn handle_dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { + let index_handle = self.index_handle.clone(); + let update_store = self.store.clone(); + + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + update_store.dump(&uuids, path.to_path_buf(), index_handle)?; + Ok(()) + }) + .await??; Ok(()) } @@ -220,9 +213,7 @@ where let info = update_store.get_info()?; Ok(info) }) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + .await??; Ok(info) } diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index 999481573..cc5ba9757 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -71,6 +71,13 @@ where receiver.await.expect("update actor killed.") } + async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Dump { uuids, path, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + async fn get_info(&self) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::GetInfo { ret }; diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index 17b2b3579..37df2af32 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -31,6 +31,11 @@ pub enum UpdateMsg { path: PathBuf, ret: oneshot::Sender>, }, + Dump { + uuids: HashSet, + path: PathBuf, + ret: oneshot::Sender>, + }, GetInfo { ret: oneshot::Sender>, }, diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index e7a12b7ff..ba89eebe3 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -1,10 +1,11 @@ mod actor; mod handle_impl; mod message; -mod update_store; +pub mod store; use std::{collections::HashSet, path::PathBuf}; +use actix_http::error::PayloadError; use thiserror::Error; use tokio::sync::mpsc; use uuid::Uuid; @@ -13,25 +14,45 @@ use crate::index_controller::{UpdateMeta, UpdateStatus}; use actor::UpdateActor; use message::UpdateMsg; -use update_store::UpdateStore; -pub use update_store::UpdateStoreInfo; pub use handle_impl::UpdateActorHandleImpl; +pub use store::{UpdateStore, UpdateStoreInfo}; pub type Result = std::result::Result; -type PayloadData = std::result::Result>; +type PayloadData = std::result::Result; #[cfg(test)] use mockall::automock; #[derive(Debug, Error)] pub enum UpdateError { - #[error("error with update: {0}")] - Error(Box), #[error("Update {0} doesn't exist.")] UnexistingUpdate(u64), + #[error("Internal error processing update: {0}")] + Internal(String), } +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for UpdateError { + fn from(other: $other) -> Self { + Self::Internal(other.to_string()) + } + } + )* + } +} + +internal_error!( + heed::Error, + std::io::Error, + serde_json::Error, + PayloadError, + tokio::task::JoinError, + anyhow::Error +); + #[async_trait::async_trait] #[cfg_attr(test, automock(type Data=Vec;))] pub trait UpdateActorHandle { @@ -40,7 +61,8 @@ pub trait UpdateActorHandle { async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()>; + async fn snapshot(&self, uuid: HashSet, path: PathBuf) -> Result<()>; + async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()>; async fn get_info(&self) -> Result; async fn update( &self, diff --git a/meilisearch-http/src/index_controller/update_actor/store/codec.rs b/meilisearch-http/src/index_controller/update_actor/store/codec.rs new file mode 100644 index 000000000..e07b52eec --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/store/codec.rs @@ -0,0 +1,86 @@ +use std::{borrow::Cow, convert::TryInto, mem::size_of}; + +use heed::{BytesDecode, BytesEncode}; +use uuid::Uuid; + +pub struct NextIdCodec; + +pub enum NextIdKey { + Global, + Index(Uuid), +} + +impl<'a> BytesEncode<'a> for NextIdCodec { + type EItem = NextIdKey; + + fn bytes_encode(item: &'a Self::EItem) -> Option> { + match item { + NextIdKey::Global => Some(Cow::Borrowed(b"__global__")), + NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())), + } + } +} + +pub struct PendingKeyCodec; + +impl<'a> BytesEncode<'a> for PendingKeyCodec { + type EItem = (u64, Uuid, u64); + + fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(&global_id.to_be_bytes()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for PendingKeyCodec { + type DItem = (u64, Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let global_id_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let global_id = u64::from_be_bytes(global_id_bytes); + + let uuid_bytes = bytes + .get(size_of::()..(size_of::() + size_of::()))? + .try_into() + .ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes + .get((size_of::() + size_of::())..)? + .try_into() + .ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((global_id, uuid, update_id)) + } +} + +pub struct UpdateKeyCodec; + +impl<'a> BytesEncode<'a> for UpdateKeyCodec { + type EItem = (Uuid, u64); + + fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for UpdateKeyCodec { + type DItem = (Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((uuid, update_id)) + } +} diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs new file mode 100644 index 000000000..8f947e459 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -0,0 +1,189 @@ +use std::{ + collections::HashSet, + fs::{create_dir_all, File}, + io::{BufRead, BufReader, Write}, + path::{Path, PathBuf}, +}; + +use heed::{EnvOpenOptions, RoTxn}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::UpdateStore; +use super::{codec::UpdateKeyCodec, State}; +use crate::index_controller::{ + index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued, + UpdateStatus, +}; + +#[derive(Serialize, Deserialize)] +struct UpdateEntry { + uuid: Uuid, + update: UpdateStatus, +} + +impl UpdateStore { + pub fn dump( + &self, + uuids: &HashSet, + path: PathBuf, + handle: impl IndexActorHandle, + ) -> anyhow::Result<()> { + let state_lock = self.state.write(); + state_lock.swap(State::Dumping); + + // txn must *always* be acquired after state lock, or it will dead lock. + let txn = self.env.write_txn()?; + + let dump_path = path.join("updates"); + create_dir_all(&dump_path)?; + + self.dump_updates(&txn, uuids, &dump_path)?; + + let fut = dump_indexes(uuids, handle, &path); + tokio::runtime::Handle::current().block_on(fut)?; + + state_lock.swap(State::Idle); + + Ok(()) + } + + fn dump_updates( + &self, + txn: &RoTxn, + uuids: &HashSet, + path: impl AsRef, + ) -> anyhow::Result<()> { + let dump_data_path = path.as_ref().join("data.jsonl"); + let mut dump_data_file = File::create(dump_data_path)?; + + let update_files_path = path.as_ref().join(super::UPDATE_DIR); + create_dir_all(&update_files_path)?; + + self.dump_pending(&txn, uuids, &mut dump_data_file, &path)?; + self.dump_completed(&txn, uuids, &mut dump_data_file)?; + + Ok(()) + } + + fn dump_pending( + &self, + txn: &RoTxn, + uuids: &HashSet, + mut file: &mut File, + dst_path: impl AsRef, + ) -> anyhow::Result<()> { + let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); + + for pending in pendings { + let ((_, uuid, _), data) = pending?; + if uuids.contains(&uuid) { + let update = data.decode()?; + + if let Some(ref update_uuid) = update.content { + let src = super::update_uuid_to_file_path(&self.path, *update_uuid); + let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid); + std::fs::copy(src, dst)?; + } + + let update_json = UpdateEntry { + uuid, + update: update.into(), + }; + + serde_json::to_writer(&mut file, &update_json)?; + file.write_all(b"\n")?; + } + } + + Ok(()) + } + + fn dump_completed( + &self, + txn: &RoTxn, + uuids: &HashSet, + mut file: &mut File, + ) -> anyhow::Result<()> { + let updates = self + .updates + .iter(txn)? + .remap_key_type::() + .lazily_decode_data(); + + for update in updates { + let ((uuid, _), data) = update?; + if uuids.contains(&uuid) { + let update = data.decode()?; + + let update_json = UpdateEntry { uuid, update }; + + serde_json::to_writer(&mut file, &update_json)?; + file.write_all(b"\n")?; + } + } + + Ok(()) + } + + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + db_size: usize, + ) -> anyhow::Result<()> { + let dst_update_path = dst.as_ref().join("updates/"); + create_dir_all(&dst_update_path)?; + + let mut options = EnvOpenOptions::new(); + options.map_size(db_size as usize); + let (store, _) = UpdateStore::new(options, &dst_update_path)?; + + let src_update_path = src.as_ref().join("updates"); + let update_data = File::open(&src_update_path.join("data.jsonl"))?; + let mut update_data = BufReader::new(update_data); + + std::fs::create_dir_all(dst_update_path.join("update_files/"))?; + + let mut wtxn = store.env.write_txn()?; + let mut line = String::new(); + loop { + match update_data.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let UpdateEntry { uuid, update } = serde_json::from_str(&line)?; + store.register_raw_updates(&mut wtxn, &update, uuid)?; + + // Copy ascociated update path if it exists + if let UpdateStatus::Enqueued(Enqueued { + content: Some(uuid), + .. + }) = update + { + let src = update_uuid_to_file_path(&src_update_path, uuid); + let dst = update_uuid_to_file_path(&dst_update_path, uuid); + std::fs::copy(src, dst)?; + } + } + _ => break, + } + + line.clear(); + } + + wtxn.commit()?; + + Ok(()) + } +} + +async fn dump_indexes( + uuids: &HashSet, + handle: impl IndexActorHandle, + path: impl AsRef, +) -> anyhow::Result<()> { + for uuid in uuids { + handle.dump(*uuid, path.as_ref().to_owned()).await?; + } + + Ok(()) +} diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs similarity index 80% rename from meilisearch-http/src/index_controller/update_actor/update_store.rs rename to meilisearch-http/src/index_controller/update_actor/store/mod.rs index 4e7acc7cf..28204f4c0 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -1,38 +1,35 @@ -use std::borrow::Cow; -use std::collections::{BTreeMap, HashSet}; -use std::convert::TryInto; +mod codec; +pub mod dump; + use std::fs::{copy, create_dir_all, remove_file, File}; -use std::mem::size_of; use std::path::Path; use std::sync::Arc; +use std::{ + collections::{BTreeMap, HashSet}, + path::PathBuf, +}; -use anyhow::Context; use arc_swap::ArcSwap; use futures::StreamExt; use heed::types::{ByteSlice, OwnedType, SerdeJson}; use heed::zerocopy::U64; -use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; +use heed::{CompactionOption, Database, Env, EnvOpenOptions}; +use log::error; use parking_lot::{Mutex, MutexGuard}; use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; +use codec::*; + use super::UpdateMeta; -use crate::index_controller::{updates::*, IndexActorHandle}; -use crate::{ - helpers::EnvSizer, - index_controller::index_actor::{IndexResult, CONCURRENT_INDEX_MSG}, -}; +use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; +use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult}; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; -struct NextIdCodec; - -enum NextIdKey { - Global, - Index(Uuid), -} +const UPDATE_DIR: &str = "update_files"; pub struct UpdateStoreInfo { /// Size of the update store in bytes. @@ -47,13 +44,13 @@ pub struct StateLock { data: ArcSwap, } -struct StateLockGuard<'a> { +pub struct StateLockGuard<'a> { _lock: MutexGuard<'a, ()>, state: &'a StateLock, } impl StateLockGuard<'_> { - fn swap(&self, state: State) -> Arc { + pub fn swap(&self, state: State) -> Arc { self.state.data.swap(Arc::new(state)) } } @@ -65,11 +62,11 @@ impl StateLock { Self { lock, data } } - fn read(&self) -> Arc { + pub fn read(&self) -> Arc { self.data.load().clone() } - fn write(&self) -> StateLockGuard { + pub fn write(&self) -> StateLockGuard { let _lock = self.lock.lock(); let state = &self; StateLockGuard { _lock, state } @@ -81,81 +78,7 @@ pub enum State { Idle, Processing(Uuid, Processing), Snapshoting, -} - -impl<'a> BytesEncode<'a> for NextIdCodec { - type EItem = NextIdKey; - - fn bytes_encode(item: &'a Self::EItem) -> Option> { - match item { - NextIdKey::Global => Some(Cow::Borrowed(b"__global__")), - NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())), - } - } -} - -struct PendingKeyCodec; - -impl<'a> BytesEncode<'a> for PendingKeyCodec { - type EItem = (u64, Uuid, u64); - - fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option> { - let mut bytes = Vec::with_capacity(size_of::()); - bytes.extend_from_slice(&global_id.to_be_bytes()); - bytes.extend_from_slice(uuid.as_bytes()); - bytes.extend_from_slice(&update_id.to_be_bytes()); - Some(Cow::Owned(bytes)) - } -} - -impl<'a> BytesDecode<'a> for PendingKeyCodec { - type DItem = (u64, Uuid, u64); - - fn bytes_decode(bytes: &'a [u8]) -> Option { - let global_id_bytes = bytes.get(0..size_of::())?.try_into().ok()?; - let global_id = u64::from_be_bytes(global_id_bytes); - - let uuid_bytes = bytes - .get(size_of::()..(size_of::() + size_of::()))? - .try_into() - .ok()?; - let uuid = Uuid::from_bytes(uuid_bytes); - - let update_id_bytes = bytes - .get((size_of::() + size_of::())..)? - .try_into() - .ok()?; - let update_id = u64::from_be_bytes(update_id_bytes); - - Some((global_id, uuid, update_id)) - } -} - -struct UpdateKeyCodec; - -impl<'a> BytesEncode<'a> for UpdateKeyCodec { - type EItem = (Uuid, u64); - - fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option> { - let mut bytes = Vec::with_capacity(size_of::()); - bytes.extend_from_slice(uuid.as_bytes()); - bytes.extend_from_slice(&update_id.to_be_bytes()); - Some(Cow::Owned(bytes)) - } -} - -impl<'a> BytesDecode<'a> for UpdateKeyCodec { - type DItem = (Uuid, u64); - - fn bytes_decode(bytes: &'a [u8]) -> Option { - let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; - let uuid = Uuid::from_bytes(uuid_bytes); - - let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; - let update_id = u64::from_be_bytes(update_id_bytes); - - Some((uuid, update_id)) - } + Dumping, } #[derive(Clone)] @@ -175,46 +98,61 @@ pub struct UpdateStore { /// | 16-bytes | 8-bytes | updates: Database>, /// Indicates the current state of the update store, - state: Arc, + pub state: Arc, /// Wake up the loop when a new event occurs. notification_sender: mpsc::Sender<()>, + path: PathBuf, } impl UpdateStore { - pub fn open( + fn new( mut options: EnvOpenOptions, path: impl AsRef, - index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, - ) -> anyhow::Result> { + ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { options.max_dbs(5); - let env = options.open(path)?; + let env = options.open(&path)?; let pending_queue = env.create_database(Some("pending-queue"))?; let next_update_id = env.create_database(Some("next-update-id"))?; let updates = env.create_database(Some("updates"))?; - let (notification_sender, mut notification_receiver) = mpsc::channel(10); - // Send a first notification to trigger the process. - let _ = notification_sender.send(()); - let state = Arc::new(StateLock::from_state(State::Idle)); + let (notification_sender, notification_receiver) = mpsc::channel(10); + + Ok(( + Self { + env, + pending_queue, + next_update_id, + updates, + state, + notification_sender, + path: path.as_ref().to_owned(), + }, + notification_receiver, + )) + } + + pub fn open( + options: EnvOpenOptions, + path: impl AsRef, + index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, + ) -> anyhow::Result> { + let (update_store, mut notification_receiver) = Self::new(options, path)?; + let update_store = Arc::new(update_store); + + // Send a first notification to trigger the process. + let _ = update_store.notification_sender.send(()); + // Init update loop to perform any pending updates at launch. // Since we just launched the update store, and we still own the receiving end of the // channel, this call is guaranteed to succeed. - notification_sender + update_store + .notification_sender .try_send(()) .expect("Failed to init update store"); - let update_store = Arc::new(UpdateStore { - env, - pending_queue, - next_update_id, - updates, - state, - notification_sender, - }); - // We need a weak reference so we can take ownership on the arc later when we // want to close the index. let update_store_weak = Arc::downgrade(&update_store); @@ -233,7 +171,7 @@ impl UpdateStore { match res { Ok(Some(_)) => (), Ok(None) => break, - Err(e) => eprintln!("error while processing update: {}", e), + Err(e) => error!("error while processing update: {}", e), } } // the ownership on the arc has been taken, we need to exit. @@ -253,21 +191,31 @@ impl UpdateStore { .get(txn, &NextIdKey::Global)? .map(U64::get) .unwrap_or_default(); + + self.next_update_id + .put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?; + + let update_id = self.next_update_id_raw(txn, index_uuid)?; + + Ok((global_id, update_id)) + } + + /// Returns the next next update id for a given `index_uuid` without + /// incrementing the global update id. This is useful for the dumps. + fn next_update_id_raw(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result { let update_id = self .next_update_id .get(txn, &NextIdKey::Index(index_uuid))? .map(U64::get) .unwrap_or_default(); - self.next_update_id - .put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?; self.next_update_id.put( txn, &NextIdKey::Index(index_uuid), &BEU64::new(update_id + 1), )?; - Ok((global_id, update_id)) + Ok(update_id) } /// Registers the update content in the pending store and the meta @@ -275,13 +223,13 @@ impl UpdateStore { pub fn register_update( &self, meta: UpdateMeta, - content: Option>, + content: Option, index_uuid: Uuid, ) -> heed::Result { let mut txn = self.env.write_txn()?; let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; - let meta = Enqueued::new(meta, update_id, content.map(|p| p.as_ref().to_owned())); + let meta = Enqueued::new(meta, update_id, content); self.pending_queue .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; @@ -294,6 +242,35 @@ impl UpdateStore { Ok(meta) } + /// Push already processed update in the UpdateStore without triggering the notification + /// process. This is useful for the dumps. + pub fn register_raw_updates( + &self, + wtxn: &mut heed::RwTxn, + update: &UpdateStatus, + index_uuid: Uuid, + ) -> heed::Result<()> { + match update { + UpdateStatus::Enqueued(enqueued) => { + let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?; + self.pending_queue.remap_key_type::().put( + wtxn, + &(global_id, index_uuid, enqueued.id()), + &enqueued, + )?; + } + _ => { + let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; + self.updates.remap_key_type::().put( + wtxn, + &(index_uuid, update.id()), + &update, + )?; + } + } + Ok(()) + } + /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. @@ -314,13 +291,14 @@ impl UpdateStore { let processing = pending.processing(); // Acquire the state lock and set the current state to processing. + // txn must *always* be acquired after state lock, or it will dead lock. let state = self.state.write(); state.swap(State::Processing(index_uuid, processing.clone())); let file = match content_path { - Some(ref path) => { - let file = File::open(path) - .with_context(|| format!("file at path: {:?}", &content_path))?; + Some(uuid) => { + let path = update_uuid_to_file_path(&self.path, uuid); + let file = File::open(path)?; Some(file) } None => None, @@ -336,7 +314,8 @@ impl UpdateStore { self.pending_queue .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; - if let Some(path) = content_path { + if let Some(uuid) = content_path { + let path = update_uuid_to_file_path(&self.path, uuid); remove_file(&path)?; } @@ -436,7 +415,7 @@ impl UpdateStore { pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> { let mut txn = self.env.write_txn()?; // Contains all the content file paths that we need to be removed if the deletion was successful. - let mut paths_to_remove = Vec::new(); + let mut uuids_to_remove = Vec::new(); let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data(); @@ -444,8 +423,8 @@ impl UpdateStore { if uuid == index_uuid { pendings.del_current()?; let mut pending = pending.decode()?; - if let Some(path) = pending.content.take() { - paths_to_remove.push(path); + if let Some(update_uuid) = pending.content.take() { + uuids_to_remove.push(update_uuid); } } } @@ -465,9 +444,12 @@ impl UpdateStore { txn.commit()?; - paths_to_remove.iter().for_each(|path| { - let _ = remove_file(path); - }); + uuids_to_remove + .iter() + .map(|uuid| update_uuid_to_file_path(&self.path, *uuid)) + .for_each(|path| { + let _ = remove_file(path); + }); // We don't care about the currently processing update, since it will be removed by itself // once its done processing, and we can't abort a running update. @@ -496,7 +478,7 @@ impl UpdateStore { // create db snapshot self.env.copy_to_path(&db_path, CompactionOption::Enabled)?; - let update_files_path = update_path.join("update_files"); + let update_files_path = update_path.join(UPDATE_DIR); create_dir_all(&update_files_path)?; let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); @@ -504,10 +486,13 @@ impl UpdateStore { for entry in pendings { let ((_, uuid, _), pending) = entry?; if uuids.contains(&uuid) { - if let Some(path) = pending.decode()?.content_path() { - let name = path.file_name().unwrap(); - let to = update_files_path.join(name); - copy(path, to)?; + if let Enqueued { + content: Some(uuid), + .. + } = pending.decode()? + { + let path = update_uuid_to_file_path(&self.path, uuid); + copy(path, &update_files_path)?; } } } @@ -533,14 +518,17 @@ impl UpdateStore { pub fn get_info(&self) -> anyhow::Result { let mut size = self.env.size(); let txn = self.env.read_txn()?; - for entry in self.pending_queue.iter(&txn)? { let (_, pending) = entry?; - if let Some(path) = pending.content_path() { + if let Enqueued { + content: Some(uuid), + .. + } = pending + { + let path = update_uuid_to_file_path(&self.path, uuid); size += File::open(path)?.metadata()?.len(); } } - let processing = match *self.state.read() { State::Processing(uuid, _) => Some(uuid), _ => None, @@ -550,6 +538,12 @@ impl UpdateStore { } } +fn update_uuid_to_file_path(root: impl AsRef, uuid: Uuid) -> PathBuf { + root.as_ref() + .join(UPDATE_DIR) + .join(format!("update_{}", uuid)) +} + #[cfg(test)] mod test { use super::*; @@ -595,9 +589,7 @@ mod test { let uuid = Uuid::new_v4(); let store_clone = update_store.clone(); tokio::task::spawn_blocking(move || { - store_clone - .register_update(meta, Some("here"), uuid) - .unwrap(); + store_clone.register_update(meta, None, uuid).unwrap(); }) .await .unwrap(); diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index a129a25a0..303289df3 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -1,10 +1,9 @@ -use std::path::{Path, PathBuf}; - use chrono::{DateTime, Utc}; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; +use uuid::Uuid; -use crate::index::{Checked, Settings}; +use crate::index::{Unchecked, Settings}; pub type UpdateError = String; @@ -25,7 +24,7 @@ pub enum UpdateMeta { }, ClearDocuments, DeleteDocuments, - Settings(Settings), + Settings(Settings), } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -34,11 +33,11 @@ pub struct Enqueued { pub update_id: u64, pub meta: UpdateMeta, pub enqueued_at: DateTime, - pub content: Option, + pub content: Option, } impl Enqueued { - pub fn new(meta: UpdateMeta, update_id: u64, content: Option) -> Self { + pub fn new(meta: UpdateMeta, update_id: u64, content: Option) -> Self { Self { enqueued_at: Utc::now(), meta, @@ -68,10 +67,6 @@ impl Enqueued { pub fn id(&self) -> u64 { self.update_id } - - pub fn content_path(&self) -> Option<&Path> { - self.content.as_deref() - } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -152,7 +147,7 @@ impl Failed { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "status", rename_all = "camelCase")] pub enum UpdateStatus { Processing(Processing), diff --git a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs index 253326276..0211cef25 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs @@ -4,7 +4,7 @@ use log::{info, warn}; use tokio::sync::mpsc; use uuid::Uuid; -use super::{Result, UuidError, UuidResolveMsg, UuidStore}; +use super::{Result, UuidResolveMsg, UuidResolverError, UuidStore}; pub struct UuidResolverActor { inbox: mpsc::Receiver, @@ -44,6 +44,9 @@ impl UuidResolverActor { Some(GetSize { ret }) => { let _ = ret.send(self.handle_get_size().await); } + Some(DumpRequest { path, ret }) => { + let _ = ret.send(self.handle_dump(path).await); + } // all senders have been dropped, need to quit. None => break, } @@ -54,7 +57,7 @@ impl UuidResolverActor { async fn handle_create(&self, uid: String) -> Result { if !is_index_uid_valid(&uid) { - return Err(UuidError::BadlyFormatted(uid)); + return Err(UuidResolverError::BadlyFormatted(uid)); } self.store.create_uuid(uid, true).await } @@ -63,14 +66,14 @@ impl UuidResolverActor { self.store .get_uuid(uid.clone()) .await? - .ok_or(UuidError::UnexistingIndex(uid)) + .ok_or(UuidResolverError::UnexistingIndex(uid)) } async fn handle_delete(&self, uid: String) -> Result { self.store .delete(uid.clone()) .await? - .ok_or(UuidError::UnexistingIndex(uid)) + .ok_or(UuidResolverError::UnexistingIndex(uid)) } async fn handle_list(&self) -> Result> { @@ -82,9 +85,13 @@ impl UuidResolverActor { self.store.snapshot(path).await } + async fn handle_dump(&self, path: PathBuf) -> Result> { + self.store.dump(path).await + } + async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { if !is_index_uid_valid(&uid) { - return Err(UuidError::BadlyFormatted(uid)); + return Err(UuidResolverError::BadlyFormatted(uid)); } self.store.insert(uid, uuid).await?; Ok(()) diff --git a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs index db4c482bd..981beb0f6 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs @@ -85,4 +85,12 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .await .expect("Uuid resolver actor has been killed")?) } + async fn dump(&self, path: PathBuf) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::DumpRequest { ret, path }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/message.rs b/meilisearch-http/src/index_controller/uuid_resolver/message.rs index a72bf0587..2092c67fd 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/message.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/message.rs @@ -34,4 +34,8 @@ pub enum UuidResolveMsg { GetSize { ret: oneshot::Sender>, }, + DumpRequest { + path: PathBuf, + ret: oneshot::Sender>>, + }, } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs index ef17133ff..5bddadf02 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -1,7 +1,7 @@ mod actor; mod handle_impl; mod message; -mod store; +pub mod store; use std::collections::HashSet; use std::path::PathBuf; @@ -11,16 +11,17 @@ use uuid::Uuid; use actor::UuidResolverActor; use message::UuidResolveMsg; -use store::{HeedUuidStore, UuidStore}; +use store::UuidStore; #[cfg(test)] use mockall::automock; pub use handle_impl::UuidResolverHandleImpl; +pub use store::HeedUuidStore; const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB -pub type Result = std::result::Result; +pub type Result = std::result::Result; #[async_trait::async_trait] #[cfg_attr(test, automock)] @@ -32,20 +33,37 @@ pub trait UuidResolverHandle { async fn list(&self) -> anyhow::Result>; async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; + async fn dump(&self, path: PathBuf) -> Result>; } #[derive(Debug, Error)] -pub enum UuidError { +pub enum UuidResolverError { #[error("Name already exist.")] NameAlreadyExist, #[error("Index \"{0}\" doesn't exist.")] UnexistingIndex(String), - #[error("Error performing task: {0}")] - TokioTask(#[from] tokio::task::JoinError), - #[error("Database error: {0}")] - Heed(#[from] heed::Error), - #[error("Uuid error: {0}")] - Uuid(#[from] uuid::Error), #[error("Badly formatted index uid: {0}")] BadlyFormatted(String), + #[error("Internal error resolving index uid: {0}")] + Internal(String), } + +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for UuidResolverError { + fn from(other: $other) -> Self { + Self::Internal(other.to_string()) + } + } + )* + } +} + +internal_error!( + heed::Error, + uuid::Error, + std::io::Error, + tokio::task::JoinError, + serde_json::Error +); diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 29c034c44..1d6ada269 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -1,18 +1,26 @@ -use std::path::{Path, PathBuf}; use std::collections::HashSet; -use std::fs::create_dir_all; +use std::fs::{create_dir_all, File}; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; -use heed::{ - types::{ByteSlice, Str}, - CompactionOption, Database, Env, EnvOpenOptions, -}; +use heed::types::{ByteSlice, Str}; +use heed::{CompactionOption, Database, Env, EnvOpenOptions}; +use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::{Result, UuidError, UUID_STORE_SIZE}; +use super::{Result, UuidResolverError, UUID_STORE_SIZE}; use crate::helpers::EnvSizer; +#[derive(Serialize, Deserialize)] +struct DumpEntry { + uuid: Uuid, + uid: String, +} + +const UUIDS_DB_PATH: &str = "index_uuids"; + #[async_trait::async_trait] -pub trait UuidStore { +pub trait UuidStore: Sized { // Create a new entry for `name`. Return an error if `err` and the entry already exists, return // the uuid otherwise. async fn create_uuid(&self, uid: String, err: bool) -> Result; @@ -22,8 +30,10 @@ pub trait UuidStore { async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; + async fn dump(&self, path: PathBuf) -> Result>; } +#[derive(Clone)] pub struct HeedUuidStore { env: Env, db: Database, @@ -31,7 +41,7 @@ pub struct HeedUuidStore { impl HeedUuidStore { pub fn new(path: impl AsRef) -> anyhow::Result { - let path = path.as_ref().join("index_uuids"); + let path = path.as_ref().join(UUIDS_DB_PATH); create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); options.map_size(UUID_STORE_SIZE); // 1GB @@ -39,123 +49,198 @@ impl HeedUuidStore { let db = env.create_database(None)?; Ok(Self { env, db }) } + + pub fn create_uuid(&self, name: String, err: bool) -> Result { + let env = self.env.clone(); + let db = self.db; + let mut txn = env.write_txn()?; + match db.get(&txn, &name)? { + Some(uuid) => { + if err { + Err(UuidResolverError::NameAlreadyExist) + } else { + let uuid = Uuid::from_slice(uuid)?; + Ok(uuid) + } + } + None => { + let uuid = Uuid::new_v4(); + db.put(&mut txn, &name, uuid.as_bytes())?; + txn.commit()?; + Ok(uuid) + } + } + } + pub fn get_uuid(&self, name: String) -> Result> { + let env = self.env.clone(); + let db = self.db; + let txn = env.read_txn()?; + match db.get(&txn, &name)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + Ok(Some(uuid)) + } + None => Ok(None), + } + } + + pub fn delete(&self, uid: String) -> Result> { + let env = self.env.clone(); + let db = self.db; + let mut txn = env.write_txn()?; + match db.get(&txn, &uid)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + db.delete(&mut txn, &uid)?; + txn.commit()?; + Ok(Some(uuid)) + } + None => Ok(None), + } + } + + pub fn list(&self) -> Result> { + let env = self.env.clone(); + let db = self.db; + let txn = env.read_txn()?; + let mut entries = Vec::new(); + for entry in db.iter(&txn)? { + let (name, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.push((name.to_owned(), uuid)) + } + Ok(entries) + } + + pub fn insert(&self, name: String, uuid: Uuid) -> Result<()> { + let env = self.env.clone(); + let db = self.db; + let mut txn = env.write_txn()?; + db.put(&mut txn, &name, uuid.as_bytes())?; + txn.commit()?; + Ok(()) + } + + pub fn snapshot(&self, mut path: PathBuf) -> Result> { + let env = self.env.clone(); + let db = self.db; + // Write transaction to acquire a lock on the database. + let txn = env.write_txn()?; + let mut entries = HashSet::new(); + for entry in db.iter(&txn)? { + let (_, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.insert(uuid); + } + + // only perform snapshot if there are indexes + if !entries.is_empty() { + path.push(UUIDS_DB_PATH); + create_dir_all(&path).unwrap(); + path.push("data.mdb"); + env.copy_to_path(path, CompactionOption::Enabled)?; + } + Ok(entries) + } + + pub fn get_size(&self) -> Result { + Ok(self.env.size()) + } + + pub fn dump(&self, path: PathBuf) -> Result> { + let dump_path = path.join(UUIDS_DB_PATH); + create_dir_all(&dump_path)?; + let dump_file_path = dump_path.join("data.jsonl"); + let mut dump_file = File::create(&dump_file_path)?; + let mut uuids = HashSet::new(); + + let txn = self.env.read_txn()?; + for entry in self.db.iter(&txn)? { + let (uid, uuid) = entry?; + let uid = uid.to_string(); + let uuid = Uuid::from_slice(uuid)?; + + let entry = DumpEntry { uuid, uid }; + serde_json::to_writer(&mut dump_file, &entry)?; + dump_file.write_all(b"\n").unwrap(); + + uuids.insert(uuid); + } + + Ok(uuids) + } + + pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH); + std::fs::create_dir_all(&uuid_resolver_path)?; + + let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl"); + let indexes = File::open(&src_indexes)?; + let mut indexes = BufReader::new(indexes); + let mut line = String::new(); + + let db = Self::new(dst)?; + let mut txn = db.env.write_txn()?; + + loop { + match indexes.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; + println!("importing {} {}", uid, uuid); + db.db.put(&mut txn, &uid, uuid.as_bytes())?; + } + Err(e) => return Err(e.into()), + } + + line.clear(); + } + txn.commit()?; + + db.env.prepare_for_closing().wait(); + + Ok(()) + } } #[async_trait::async_trait] impl UuidStore for HeedUuidStore { async fn create_uuid(&self, name: String, err: bool) -> Result { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let mut txn = env.write_txn()?; - match db.get(&txn, &name)? { - Some(uuid) => { - if err { - Err(UuidError::NameAlreadyExist) - } else { - let uuid = Uuid::from_slice(uuid)?; - Ok(uuid) - } - } - None => { - let uuid = Uuid::new_v4(); - db.put(&mut txn, &name, uuid.as_bytes())?; - txn.commit()?; - Ok(uuid) - } - } - }) - .await? + let this = self.clone(); + tokio::task::spawn_blocking(move || this.create_uuid(name, err)).await? } async fn get_uuid(&self, name: String) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let txn = env.read_txn()?; - match db.get(&txn, &name)? { - Some(uuid) => { - let uuid = Uuid::from_slice(uuid)?; - Ok(Some(uuid)) - } - None => Ok(None), - } - }) - .await? + let this = self.clone(); + tokio::task::spawn_blocking(move || this.get_uuid(name)).await? } async fn delete(&self, uid: String) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let mut txn = env.write_txn()?; - match db.get(&txn, &uid)? { - Some(uuid) => { - let uuid = Uuid::from_slice(uuid)?; - db.delete(&mut txn, &uid)?; - txn.commit()?; - Ok(Some(uuid)) - } - None => Ok(None), - } - }) - .await? + let this = self.clone(); + tokio::task::spawn_blocking(move || this.delete(uid)).await? } async fn list(&self) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let txn = env.read_txn()?; - let mut entries = Vec::new(); - for entry in db.iter(&txn)? { - let (name, uuid) = entry?; - let uuid = Uuid::from_slice(uuid)?; - entries.push((name.to_owned(), uuid)) - } - Ok(entries) - }) - .await? + let this = self.clone(); + tokio::task::spawn_blocking(move || this.list()).await? } async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let mut txn = env.write_txn()?; - db.put(&mut txn, &name, uuid.as_bytes())?; - txn.commit()?; - Ok(()) - }) - .await? + let this = self.clone(); + tokio::task::spawn_blocking(move || this.insert(name, uuid)).await? } - async fn snapshot(&self, mut path: PathBuf) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - // Write transaction to acquire a lock on the database. - let txn = env.write_txn()?; - let mut entries = HashSet::new(); - for entry in db.iter(&txn)? { - let (_, uuid) = entry?; - let uuid = Uuid::from_slice(uuid)?; - entries.insert(uuid); - } - - // only perform snapshot if there are indexes - if !entries.is_empty() { - path.push("index_uuids"); - create_dir_all(&path).unwrap(); - path.push("data.mdb"); - env.copy_to_path(path, CompactionOption::Enabled)?; - } - Ok(entries) - }) - .await? + async fn snapshot(&self, path: PathBuf) -> Result> { + let this = self.clone(); + tokio::task::spawn_blocking(move || this.snapshot(path)).await? } async fn get_size(&self) -> Result { - Ok(self.env.size()) + self.get_size() + } + + async fn dump(&self, path: PathBuf) -> Result> { + let this = self.clone(); + tokio::task::spawn_blocking(move || this.dump(path)).await? } } diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index fd5cf6786..26b6a784c 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -46,8 +46,8 @@ macro_rules! create_app { .configure(synonym::services) .configure(health::services) .configure(stats::services) - .configure(key::services); - //.configure(routes::dump::services); + .configure(key::services) + .configure(dump::services); #[cfg(feature = "mini-dashboard")] let app = if $enable_frontend { let generated = dashboard::generate(); @@ -62,11 +62,11 @@ macro_rules! create_app { app.wrap( Cors::default() - .send_wildcard() - .allowed_headers(vec!["content-type", "x-meili-api-key"]) - .allow_any_origin() - .allow_any_method() - .max_age(86_400) // 24h + .send_wildcard() + .allowed_headers(vec!["content-type", "x-meili-api-key"]) + .allow_any_origin() + .allow_any_method() + .max_age(86_400), // 24h ) .wrap(middleware::Logger::default()) .wrap(middleware::Compress::default()) diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index 1997718cc..eb81ab9fd 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -202,10 +202,6 @@ pub struct Opt { #[structopt(long, conflicts_with = "import-snapshot")] pub import_dump: Option, - /// The batch size used in the importation process, the bigger it is the faster the dump is created. - #[structopt(long, env = "MEILI_DUMP_BATCH_SIZE", default_value = "1024")] - pub dump_batch_size: usize, - #[structopt(flatten)] pub indexer_options: IndexerOpts, } diff --git a/meilisearch-http/src/routes/dump.rs b/meilisearch-http/src/routes/dump.rs index c46b0e502..370eef509 100644 --- a/meilisearch-http/src/routes/dump.rs +++ b/meilisearch-http/src/routes/dump.rs @@ -1,25 +1,20 @@ -use std::fs::File; -use std::path::Path; - -use actix_web::{get, post}; -use actix_web::{HttpResponse, web}; +use actix_web::HttpResponse; +use actix_web::{get, post, web}; use serde::{Deserialize, Serialize}; -use crate::dump::{DumpInfo, DumpStatus, compressed_dumps_dir, init_dump_process}; -use crate::Data; -use crate::error::{Error, ResponseError}; +use crate::error::ResponseError; use crate::helpers::Authentication; +use crate::Data; pub fn services(cfg: &mut web::ServiceConfig) { - cfg.service(trigger_dump) - .service(get_dump_status); + cfg.service(create_dump).service(get_dump_status); } #[post("/dumps", wrap = "Authentication::Private")] -async fn trigger_dump( - data: web::Data, -) -> Result { - todo!() +async fn create_dump(data: web::Data) -> Result { + let res = data.create_dump().await?; + + Ok(HttpResponse::Accepted().json(res)) } #[derive(Debug, Serialize)] @@ -38,5 +33,7 @@ async fn get_dump_status( data: web::Data, path: web::Path, ) -> Result { - todo!() + let res = data.dump_status(path.dump_uid.clone()).await?; + + Ok(HttpResponse::Ok().json(res)) } diff --git a/meilisearch-http/src/routes/index.rs b/meilisearch-http/src/routes/index.rs index 4424c8cfe..4dfe90abf 100644 --- a/meilisearch-http/src/routes/index.rs +++ b/meilisearch-http/src/routes/index.rs @@ -1,6 +1,7 @@ use actix_web::{delete, get, post, put}; use actix_web::{web, HttpResponse}; -use serde::Deserialize; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; use crate::error::ResponseError; use crate::helpers::Authentication; @@ -68,6 +69,16 @@ struct UpdateIndexRequest { primary_key: Option, } +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateIndexResponse { + name: String, + uid: String, + created_at: DateTime, + updated_at: DateTime, + primary_key: Option, +} + #[put("/indexes/{index_uid}", wrap = "Authentication::Private")] async fn update_index( data: web::Data, diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index aaf13613a..a550064ba 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -2,6 +2,7 @@ use actix_web::{get, HttpResponse}; use serde::{Deserialize, Serialize}; pub mod document; +pub mod dump; pub mod health; pub mod index; pub mod key; @@ -9,7 +10,6 @@ pub mod search; pub mod settings; pub mod stats; pub mod synonym; -//pub mod dump; #[derive(Deserialize)] pub struct IndexParam { diff --git a/meilisearch-http/src/routes/settings/mod.rs b/meilisearch-http/src/routes/settings/mod.rs index 03f1ee95c..8ede56046 100644 --- a/meilisearch-http/src/routes/settings/mod.rs +++ b/meilisearch-http/src/routes/settings/mod.rs @@ -1,9 +1,9 @@ use actix_web::{delete, get, post, web, HttpResponse}; -use crate::{error::ResponseError, index::Unchecked}; use crate::helpers::Authentication; use crate::index::Settings; use crate::Data; +use crate::{error::ResponseError, index::Unchecked}; #[macro_export] macro_rules! make_setting_route { diff --git a/meilisearch-http/tests/common/index.rs b/meilisearch-http/tests/common/index.rs index adb7fef3e..7d98d0733 100644 --- a/meilisearch-http/tests/common/index.rs +++ b/meilisearch-http/tests/common/index.rs @@ -47,7 +47,7 @@ impl Index<'_> { update_id as u64 } - pub async fn create(& self, primary_key: Option<&str>) -> (Value, StatusCode) { + pub async fn create(&self, primary_key: Option<&str>) -> (Value, StatusCode) { let body = json!({ "uid": self.uid, "primaryKey": primary_key, diff --git a/meilisearch-http/tests/common/server.rs b/meilisearch-http/tests/common/server.rs index 100722ec4..0fb801d7f 100644 --- a/meilisearch-http/tests/common/server.rs +++ b/meilisearch-http/tests/common/server.rs @@ -44,7 +44,7 @@ impl Server { } /// Returns a view to an index. There is no guarantee that the index exists. - pub fn index(& self, uid: impl AsRef) -> Index<'_> { + pub fn index(&self, uid: impl AsRef) -> Index<'_> { Index { uid: encode(uid.as_ref()), service: &self.service, @@ -68,7 +68,6 @@ pub fn default_settings(dir: impl AsRef) -> Opt { Opt { db_path: dir.as_ref().join("db"), dumps_dir: dir.as_ref().join("dump"), - dump_batch_size: 16, http_addr: "127.0.0.1:7700".to_owned(), master_key: None, env: "development".to_owned(), diff --git a/meilisearch-http/tests/settings/get_settings.rs b/meilisearch-http/tests/settings/get_settings.rs index 4230e19f8..ab688076d 100644 --- a/meilisearch-http/tests/settings/get_settings.rs +++ b/meilisearch-http/tests/settings/get_settings.rs @@ -73,7 +73,7 @@ async fn reset_all_settings() { let server = Server::new().await; let index = server.index("test"); index - .update_settings(json!({"displayedAttributes": ["foo"], "searchableAttributes": ["bar"], "stopWords": ["the"] })) + .update_settings(json!({"displayedAttributes": ["foo"], "searchableAttributes": ["bar"], "stopWords": ["the"], "attributesForFaceting": { "toto": "string" } })) .await; index.wait_update_id(0).await; let (response, code) = index.settings().await; @@ -81,6 +81,7 @@ async fn reset_all_settings() { assert_eq!(response["displayedAttributes"], json!(["foo"])); assert_eq!(response["searchableAttributes"], json!(["bar"])); assert_eq!(response["stopWords"], json!(["the"])); + assert_eq!(response["attributesForFaceting"], json!({"toto": "string"})); index.delete_settings().await; index.wait_update_id(1).await; @@ -90,6 +91,7 @@ async fn reset_all_settings() { assert_eq!(response["displayedAttributes"], json!(["*"])); assert_eq!(response["searchableAttributes"], json!(["*"])); assert_eq!(response["stopWords"], json!([])); + assert_eq!(response["attributesForFaceting"], json!({})); } #[actix_rt::test]