From efca63f9cea84d8b98255ea7b2a333dabf63f924 Mon Sep 17 00:00:00 2001 From: tamo Date: Mon, 10 May 2021 20:25:09 +0200 Subject: [PATCH] [WIP] rebase on main --- meilisearch-http/src/data/mod.rs | 11 +- .../src/index_controller/dump_actor/actor.rs | 200 ++++++++++++++++++ .../dump_actor/handle_impl.rs | 41 ++++ .../index_controller/dump_actor/message.rs | 15 ++ .../{dump => dump_actor}/mod.rs | 151 +++++++------ .../{dump => dump_actor}/v1.rs | 0 .../{dump => dump_actor}/v2.rs | 0 .../src/index_controller/index_actor/actor.rs | 4 +- meilisearch-http/src/index_controller/mod.rs | 27 ++- meilisearch-http/src/routes/dump.rs | 19 +- 10 files changed, 381 insertions(+), 87 deletions(-) create mode 100644 meilisearch-http/src/index_controller/dump_actor/actor.rs create mode 100644 meilisearch-http/src/index_controller/dump_actor/handle_impl.rs create mode 100644 meilisearch-http/src/index_controller/dump_actor/message.rs rename meilisearch-http/src/index_controller/{dump => dump_actor}/mod.rs (63%) rename meilisearch-http/src/index_controller/{dump => dump_actor}/v1.rs (100%) rename meilisearch-http/src/index_controller/{dump => dump_actor}/v2.rs (100%) diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index 39cfed626..008065d74 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -4,8 +4,7 @@ use std::sync::Arc; use sha2::Digest; use crate::index::{Checked, Settings}; -use crate::index_controller::{IndexController, IndexStats, Stats}; -use crate::index_controller::{IndexMetadata, IndexSettings}; +use crate::index_controller::{IndexController, IndexStats, Stats, DumpInfo, IndexMetadata, IndexSettings}; use crate::option::Opt; pub mod search; @@ -108,8 +107,12 @@ impl Data { Ok(self.index_controller.get_all_stats().await?) } - pub async fn dump(&self) -> anyhow::Result { - Ok(self.index_controller.dump(self.options.dumps_dir.clone()).await?) + pub async fn create_dump(&self) -> anyhow::Result { + Ok(self.index_controller.create_dump().await?) + } + + pub async fn dump_status(&self, uid: String) -> anyhow::Result { + Ok(self.index_controller.dump_info(uid).await?) } #[inline] diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs new file mode 100644 index 000000000..b41ddadcf --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -0,0 +1,200 @@ +use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; +use crate::helpers::compression; +use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata}; +use chrono::Utc; +use log::{error, info, warn}; +use std::{ + collections::HashSet, + path::{Path, PathBuf}, + sync::Arc, +}; +use tokio::sync::{mpsc, Mutex}; +use uuid::Uuid; + +pub struct DumpActor { + inbox: mpsc::Receiver, + inner: InnerDump, +} + +#[derive(Clone)] +struct InnerDump { + pub uuid_resolver: UuidResolver, + pub index: Index, + pub update: Update, + pub dump_path: PathBuf, + pub dump_info: Arc>>, +} + +/// Generate uid from creation date +fn generate_uid() -> String { + Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() +} + +impl DumpActor +where + UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, + Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, + Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, +{ + pub fn new( + inbox: mpsc::Receiver, + uuid_resolver: UuidResolver, + index: Index, + update: Update, + dump_path: impl AsRef, + ) -> Self { + Self { + inbox, + inner: InnerDump { + uuid_resolver, + index, + update, + dump_path: dump_path.as_ref().into(), + dump_info: Arc::new(Mutex::new(None)), + }, + } + } + + pub async fn run(mut self) { + use DumpMsg::*; + + info!("Started dump actor."); + + loop { + match self.inbox.recv().await { + Some(CreateDump { ret }) => { + let _ = ret.send(self.inner.clone().handle_create_dump().await); + } + Some(DumpInfo { ret, uid }) => { + let _ = ret.send(self.inner.handle_dump_info(uid).await); + } + None => break, + } + } + + error!("Dump actor stopped."); + } +} + +impl InnerDump +where + UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, + Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, + Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, +{ + async fn handle_create_dump(self) -> DumpResult { + if self.is_running().await { + return Err(DumpError::DumpAlreadyRunning); + } + let uid = generate_uid(); + let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); + *self.dump_info.lock().await = Some(info.clone()); + + let this = self.clone(); + + tokio::task::spawn(async move { + match this.perform_dump(uid).await { + Ok(()) => { + if let Some(ref mut info) = *self.dump_info.lock().await { + info.done(); + } else { + warn!("dump actor was in an inconsistant state"); + } + info!("Dump succeed"); + } + Err(e) => { + if let Some(ref mut info) = *self.dump_info.lock().await { + info.with_error(e.to_string()); + } else { + warn!("dump actor was in an inconsistant state"); + } + error!("Dump failed: {}", e); + } + }; + }); + + Ok(info) + } + + async fn perform_dump(self, uid: String) -> anyhow::Result<()> { + info!("Performing dump."); + + let dump_dir = self.dump_path.clone(); + tokio::fs::create_dir_all(&dump_dir).await?; + let temp_dump_dir = + tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let uuids = self.uuid_resolver.list().await?; + // maybe we could just keep the vec as-is + let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect(); + + if uuids.is_empty() { + return Ok(()); + } + + let indexes = self.list_indexes().await?; + + // we create one directory by index + for meta in indexes.iter() { + tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?; + } + + let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); + metadata.to_path(&temp_dump_path).await?; + + self.update.dump(uuids, temp_dump_path.clone()).await?; + + let dump_dir = self.dump_path.clone(); + let dump_path = self.dump_path.join(format!("{}.dump", uid)); + let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { + let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; + let temp_dump_file_path = temp_dump_file.path().to_owned(); + compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; + temp_dump_file.persist(&dump_path)?; + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) + } + + async fn list_indexes(&self) -> anyhow::Result> { + let uuids = self.uuid_resolver.list().await?; + + let mut ret = Vec::new(); + + for (uid, uuid) in uuids { + let meta = self.index.get_index_meta(uuid).await?; + let meta = IndexMetadata { + uuid, + name: uid.clone(), + uid, + meta, + }; + ret.push(meta); + } + + Ok(ret) + } + + async fn handle_dump_info(&self, uid: String) -> DumpResult { + match &*self.dump_info.lock().await { + None => Err(DumpError::DumpDoesNotExist(uid)), + Some(DumpInfo { uid: ref s, .. }) if &uid != s => Err(DumpError::DumpDoesNotExist(uid)), + Some(info) => Ok(info.clone()), + } + } + + async fn is_running(&self) -> bool { + matches!( + *self.dump_info.lock().await, + Some(DumpInfo { + status: DumpStatus::InProgress, + .. + }) + ) + } +} diff --git a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs new file mode 100644 index 000000000..601c97c01 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs @@ -0,0 +1,41 @@ +use std::path::{Path}; +use actix_web::web::Bytes; +use tokio::sync::{mpsc, oneshot}; +use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult}; + +#[derive(Clone)] +pub struct DumpActorHandleImpl { + sender: mpsc::Sender, +} + +#[async_trait::async_trait] +impl DumpActorHandle for DumpActorHandleImpl { + async fn create_dump(&self) -> DumpResult { + let (ret, receiver) = oneshot::channel(); + let msg = DumpMsg::CreateDump { ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("IndexActor has been killed") + } + + async fn dump_info(&self, uid: String) -> DumpResult { + let (ret, receiver) = oneshot::channel(); + let msg = DumpMsg::DumpInfo { ret, uid }; + let _ = self.sender.send(msg).await; + receiver.await.expect("IndexActor has been killed") + } +} + +impl DumpActorHandleImpl { + pub fn new( + path: impl AsRef, + uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl, + index: crate::index_controller::index_actor::IndexActorHandleImpl, + update: crate::index_controller::update_actor::UpdateActorHandleImpl, + ) -> anyhow::Result { + let (sender, receiver) = mpsc::channel(10); + let actor = DumpActor::new(receiver, uuid_resolver, index, update, path); + + tokio::task::spawn(actor.run()); + Ok(Self { sender }) + } +} diff --git a/meilisearch-http/src/index_controller/dump_actor/message.rs b/meilisearch-http/src/index_controller/dump_actor/message.rs new file mode 100644 index 000000000..14409afbb --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/message.rs @@ -0,0 +1,15 @@ +use tokio::sync::oneshot; + +use super::{DumpResult, DumpInfo}; + + +pub enum DumpMsg { + CreateDump { + ret: oneshot::Sender>, + }, + DumpInfo { + uid: String, + ret: oneshot::Sender>, + }, +} + diff --git a/meilisearch-http/src/index_controller/dump/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs similarity index 63% rename from meilisearch-http/src/index_controller/dump/mod.rs rename to meilisearch-http/src/index_controller/dump_actor/mod.rs index a44d4235b..f57c27c59 100644 --- a/meilisearch-http/src/index_controller/dump/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -1,23 +1,48 @@ mod v1; mod v2; +mod handle_impl; +mod actor; +mod message; -use std::{collections::HashSet, fs::{File}, path::{Path, PathBuf}, sync::Arc}; +use std::{ + fs::File, + path::Path, + sync::Arc, +}; +#[cfg(test)] +use mockall::automock; use anyhow::bail; -use chrono::Utc; +use thiserror::Error; use heed::EnvOpenOptions; use log::{error, info}; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use serde::{Deserialize, Serialize}; +use serde_json::json; use tempfile::TempDir; -use tokio::task::spawn_blocking; -use tokio::fs; -use uuid::Uuid; -use super::{IndexController, IndexMetadata, update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle}; +use super::IndexMetadata; +use crate::helpers::compression; use crate::index::Index; use crate::index_controller::uuid_resolver; -use crate::helpers::compression; + +pub use handle_impl::*; +pub use actor::DumpActor; +pub use message::DumpMsg; + +pub type DumpResult = std::result::Result; + +#[derive(Error, Debug)] +pub enum DumpError { + #[error("error with index: {0}")] + Error(#[from] anyhow::Error), + #[error("Heed error: {0}")] + HeedError(#[from] heed::Error), + #[error("dump already running")] + DumpAlreadyRunning, + #[error("dump `{0}` does not exist")] + DumpDoesNotExist(String), +} #[derive(Debug, Serialize, Deserialize, Copy, Clone)] enum DumpVersion { @@ -29,7 +54,12 @@ impl DumpVersion { const CURRENT: Self = Self::V2; /// Select the good importation function from the `DumpVersion` of metadata - pub fn import_index(self, size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> { + pub fn import_index( + self, + size: usize, + dump_path: &Path, + index_path: &Path, + ) -> anyhow::Result<()> { match self { Self::V1 => v1::import_index(size, dump_path, index_path), Self::V2 => v2::import_index(size, dump_path, index_path), @@ -37,6 +67,19 @@ impl DumpVersion { } } +#[async_trait::async_trait] +#[cfg_attr(test, automock)] +pub trait DumpActorHandle { + /// Start the creation of a dump + /// Implementation: [handle_impl::DumpActorHandleImpl::create_dump] + async fn create_dump(&self) -> DumpResult; + + /// Return the status of an already created dump + /// Implementation: [handle_impl::DumpActorHandleImpl::dump_status] + async fn dump_info(&self, uid: String) -> DumpResult; +} + + #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Metadata { @@ -74,66 +117,46 @@ impl Metadata { } } -/// Generate uid from creation date -fn generate_uid() -> String { - Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[serde(rename_all = "snake_case")] +pub enum DumpStatus { + Done, + InProgress, + Failed, } -pub async fn perform_dump(index_controller: &IndexController, dump_path: PathBuf) -> anyhow::Result { - info!("Performing dump."); +#[derive(Debug, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct DumpInfo { + pub uid: String, + pub status: DumpStatus, + #[serde(skip_serializing_if = "Option::is_none", flatten)] + pub error: Option, +} - let dump_dir = dump_path.clone(); - let uid = generate_uid(); - fs::create_dir_all(&dump_dir).await?; - let temp_dump_dir = spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; - let temp_dump_path = temp_dump_dir.path().to_owned(); - - let uuids = index_controller.uuid_resolver.list().await?; - // maybe we could just keep the vec as-is - let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect(); - - if uuids.is_empty() { - return Ok(uid); +impl DumpInfo { + pub fn new(uid: String, status: DumpStatus) -> Self { + Self { + uid, + status, + error: None, + } } - let indexes = index_controller.list_indexes().await?; - - // we create one directory by index - for meta in indexes.iter() { - tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?; + pub fn with_error(&mut self, error: String) { + self.status = DumpStatus::Failed; + self.error = Some(json!(error)); } - let metadata = Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); - metadata.to_path(&temp_dump_path).await?; + pub fn done(&mut self) { + self.status = DumpStatus::Done; + } - index_controller.update_handle.dump(uuids, temp_dump_path.clone()).await?; - let dump_dir = dump_path.clone(); - let dump_path = dump_path.join(format!("{}.dump", uid)); - let dump_path = spawn_blocking(move || -> anyhow::Result { - let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; - let temp_dump_file_path = temp_dump_file.path().to_owned(); - compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; - temp_dump_file.persist(&dump_path)?; - Ok(dump_path) - }) - .await??; - - info!("Created dump in {:?}.", dump_path); - - Ok(uid) + pub fn dump_already_in_progress(&self) -> bool { + self.status == DumpStatus::InProgress + } } -/* -/// Write Settings in `settings.json` file at provided `dir_path` -fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> { -let path = dir_path.join("settings.json"); -let file = File::create(path)?; - -serde_json::to_writer(file, settings)?; - -Ok(()) -} -*/ pub fn load_dump( db_path: impl AsRef, @@ -185,12 +208,18 @@ pub fn load_dump( let index_path = db_path.join(&format!("indexes/index-{}", uuid)); // let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db - info!("Importing dump from {} into {}...", dump_path.display(), index_path.display()); - metadata.dump_version.import_index(size, &dump_path, &index_path).unwrap(); + info!( + "Importing dump from {} into {}...", + dump_path.display(), + index_path.display() + ); + metadata + .dump_version + .import_index(size, &dump_path, &index_path) + .unwrap(); info!("Dump importation from {} succeed", dump_path.display()); } - info!("Dump importation from {} succeed", dump_path.display()); Ok(()) } diff --git a/meilisearch-http/src/index_controller/dump/v1.rs b/meilisearch-http/src/index_controller/dump_actor/v1.rs similarity index 100% rename from meilisearch-http/src/index_controller/dump/v1.rs rename to meilisearch-http/src/index_controller/dump_actor/v1.rs diff --git a/meilisearch-http/src/index_controller/dump/v2.rs b/meilisearch-http/src/index_controller/dump_actor/v2.rs similarity index 100% rename from meilisearch-http/src/index_controller/dump/v2.rs rename to meilisearch-http/src/index_controller/dump_actor/v2.rs diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 623b42ddc..ca23663b7 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -315,8 +315,8 @@ impl IndexActor { /// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the /// documents and all the settings. async fn handle_dump(&self, uid: &str, uuid: Uuid, path: PathBuf) -> IndexResult<()> { - use tokio::fs::create_dir_all; use std::io::prelude::*; + use tokio::fs::create_dir_all; create_dir_all(&path) .await @@ -348,7 +348,6 @@ impl IndexActor { file.write_all(b"\n")?; } - // then we dump all the settings let file = File::create(settings_path)?; let mut file = std::io::BufWriter::new(file); @@ -357,7 +356,6 @@ impl IndexActor { file.write_all(serde_json::to_string(&settings)?.as_bytes())?; file.write_all(b"\n")?; - Ok(()) }) .await diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 6ea42c73d..d1bb5e170 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, path::PathBuf}; +use std::collections::BTreeMap; use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -15,6 +15,8 @@ use tokio::time::sleep; use uuid::Uuid; pub use updates::*; +pub use dump_actor::{DumpInfo, DumpStatus}; +use dump_actor::DumpActorHandle; use index_actor::IndexActorHandle; use snapshot::{SnapshotService, load_snapshot}; use update_actor::UpdateActorHandle; @@ -23,11 +25,11 @@ use uuid_resolver::{UuidError, UuidResolverHandle}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::option::Opt; -use self::dump::load_dump; +use dump_actor::load_dump; mod index_actor; mod snapshot; -mod dump; +mod dump_actor; mod update_actor; mod update_handler; mod updates; @@ -63,10 +65,12 @@ pub struct IndexStats { pub fields_distribution: FieldsDistribution, } +#[derive(Clone)] pub struct IndexController { uuid_resolver: uuid_resolver::UuidResolverHandleImpl, index_handle: index_actor::IndexActorHandleImpl, update_handle: update_actor::UpdateActorHandleImpl, + dump_handle: dump_actor::DumpActorHandleImpl, } #[derive(Serialize)] @@ -108,6 +112,7 @@ impl IndexController { &path, update_store_size, )?; + let dump_handle = dump_actor::DumpActorHandleImpl::new(&options.dumps_dir, uuid_resolver.clone(), index_handle.clone(), update_handle.clone())?; if options.schedule_snapshot { let snapshot_service = SnapshotService::new( @@ -129,6 +134,7 @@ impl IndexController { uuid_resolver, index_handle, update_handle, + dump_handle, }) } @@ -378,13 +384,6 @@ impl IndexController { Ok(stats) } - pub async fn dump(&self, path: PathBuf) -> anyhow::Result { - eprintln!("index_controller::mod called"); - let res = dump::perform_dump(self, path).await?; - eprintln!("index_controller::mod finished"); - Ok(res) - } - pub async fn get_all_stats(&self) -> anyhow::Result { let update_infos = self.update_handle.get_info().await?; let mut database_size = self.get_uuids_size().await? + update_infos.size; @@ -410,6 +409,14 @@ impl IndexController { indexes, }) } + + pub async fn create_dump(&self) -> anyhow::Result { + Ok(self.dump_handle.create_dump().await?) + } + + pub async fn dump_info(&self, uid: String) -> anyhow::Result { + Ok(self.dump_handle.dump_info(uid).await?) + } } pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { diff --git a/meilisearch-http/src/routes/dump.rs b/meilisearch-http/src/routes/dump.rs index 410b817b8..e6be4ca93 100644 --- a/meilisearch-http/src/routes/dump.rs +++ b/meilisearch-http/src/routes/dump.rs @@ -7,18 +7,17 @@ use crate::helpers::Authentication; use crate::Data; pub fn services(cfg: &mut web::ServiceConfig) { - cfg.service(trigger_dump) + cfg.service(create_dump) .service(get_dump_status); } #[post("/dumps", wrap = "Authentication::Private")] -async fn trigger_dump( +async fn create_dump( data: web::Data, ) -> Result { - eprintln!("dump started"); - let res = data.dump().await?; + let res = data.create_dump().await?; - Ok(HttpResponse::Ok().body(res)) + Ok(HttpResponse::Ok().json(res)) } #[derive(Debug, Serialize)] @@ -29,13 +28,15 @@ struct DumpStatusResponse { #[derive(Deserialize)] struct DumpParam { - _dump_uid: String, + dump_uid: String, } #[get("/dumps/{dump_uid}/status", wrap = "Authentication::Private")] async fn get_dump_status( - _data: web::Data, - _path: web::Path, + data: web::Data, + path: web::Path, ) -> Result { - todo!() + let res = data.dump_status(path.dump_uid.clone()).await?; + + Ok(HttpResponse::Ok().json(res)) }