From 529f7962f46873fcdce04125e15da5c485e1a929 Mon Sep 17 00:00:00 2001 From: tamo Date: Mon, 24 May 2021 15:42:12 +0200 Subject: [PATCH 1/8] handle parallel requests for the dump actor --- .../src/index_controller/dump_actor/actor.rs | 47 ++++++++++++++----- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 82a38cf96..fac67cbc0 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -1,7 +1,9 @@ use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; use crate::helpers::compression; use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata}; +use async_stream::stream; use chrono::Utc; +use futures::stream::StreamExt; use log::{error, info, warn}; use std::{ collections::HashSet, @@ -11,8 +13,10 @@ use std::{ use tokio::sync::{mpsc, Mutex}; use uuid::Uuid; +pub const CONCURRENT_DUMP_MSG: usize = 10; + pub struct DumpActor { - inbox: mpsc::Receiver, + inbox: Option>, inner: InnerDump, } @@ -44,7 +48,7 @@ where dump_path: impl AsRef, ) -> Self { Self { - inbox, + inbox: Some(inbox), inner: InnerDump { uuid_resolver, index, @@ -56,24 +60,41 @@ where } pub async fn run(mut self) { - use DumpMsg::*; - info!("Started dump actor."); - loop { - match self.inbox.recv().await { - Some(CreateDump { ret }) => { - let _ = ret.send(self.inner.clone().handle_create_dump().await); + let mut inbox = self + .inbox + .take() + .expect("Dump Actor must have a inbox at this point."); + + let stream = stream! { + loop { + match inbox.recv().await { + Some(msg) => yield msg, + None => break, } - Some(DumpInfo { ret, uid }) => { - let _ = ret.send(self.inner.handle_dump_info(uid).await); - } - None => break, } - } + }; + + stream + .for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg)) + .await; error!("Dump actor stopped."); } + + async fn handle_message(&self, msg: DumpMsg) { + use DumpMsg::*; + + match msg { + CreateDump { ret } => { + let _ = ret.send(self.inner.clone().handle_create_dump().await); + } + DumpInfo { ret, uid } => { + let _ = ret.send(self.inner.handle_dump_info(uid).await); + } + } + } } impl InnerDump From dcf29e10816ef4b8e48677062206fcfaa889b878 Mon Sep 17 00:00:00 2001 From: tamo Date: Mon, 24 May 2021 17:33:42 +0200 Subject: [PATCH 2/8] fix the error handling in case there is a panic while creating a dump --- .../src/index_controller/dump_actor/actor.rs | 59 +++++++++++-------- .../src/index_controller/dump_actor/mod.rs | 5 +- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index fac67cbc0..c10cd90b8 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -10,7 +10,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, oneshot, Mutex}; use uuid::Uuid; pub const CONCURRENT_DUMP_MSG: usize = 10; @@ -88,7 +88,7 @@ where match msg { CreateDump { ret } => { - let _ = ret.send(self.inner.clone().handle_create_dump().await); + let _ = self.inner.clone().handle_create_dump(ret).await; } DumpInfo { ret, uid } => { let _ = ret.send(self.inner.handle_dump_info(uid).await); @@ -103,38 +103,45 @@ where Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, { - async fn handle_create_dump(self) -> DumpResult { + async fn handle_create_dump(self, ret: oneshot::Sender>) { if self.is_running().await { - return Err(DumpError::DumpAlreadyRunning); + ret.send(Err(DumpError::DumpAlreadyRunning)) + .expect("Dump actor is dead"); + return; } let uid = generate_uid(); let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); *self.dump_info.lock().await = Some(info.clone()); - let this = self.clone(); + ret.send(Ok(info)).expect("Dump actor is dead"); - tokio::task::spawn(async move { - match this.perform_dump(uid).await { - Ok(()) => { - if let Some(ref mut info) = *self.dump_info.lock().await { - info.done(); - } else { - warn!("dump actor was in an inconsistant state"); - } - info!("Dump succeed"); - } - Err(e) => { - if let Some(ref mut info) = *self.dump_info.lock().await { - info.with_error(e.to_string()); - } else { - warn!("dump actor was in an inconsistant state"); - } - error!("Dump failed: {}", e); - } - }; - }); + let dump_info = self.dump_info.clone(); + let cloned_uid = uid.clone(); - Ok(info) + let task_result = tokio::task::spawn(self.clone().perform_dump(cloned_uid)).await; + + match task_result { + Ok(Ok(())) => { + if let Some(ref mut info) = *dump_info.lock().await { + info.done(); + } else { + warn!("dump actor was in an inconsistant state"); + } + info!("Dump succeed"); + } + Ok(Err(e)) => { + if let Some(ref mut info) = *dump_info.lock().await { + info.with_error(e.to_string()); + } else { + warn!("dump actor was in an inconsistant state"); + } + error!("Dump failed: {}", e); + } + Err(_) => { + error!("Dump panicked. Dump status set to failed"); + *dump_info.lock().await = Some(DumpInfo::new(uid, DumpStatus::Failed)); + } + }; } async fn perform_dump(self, uid: String) -> anyhow::Result<()> { diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index 7d2e5a951..ea0d7adbd 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -13,7 +13,6 @@ use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; #[cfg(test)] use mockall::automock; use serde::{Deserialize, Serialize}; -use serde_json::json; use tempfile::TempDir; use thiserror::Error; use uuid::Uuid; @@ -129,7 +128,7 @@ pub struct DumpInfo { pub uid: String, pub status: DumpStatus, #[serde(skip_serializing_if = "Option::is_none", flatten)] - pub error: Option, + pub error: Option, } impl DumpInfo { @@ -143,7 +142,7 @@ impl DumpInfo { pub fn with_error(&mut self, error: String) { self.status = DumpStatus::Failed; - self.error = Some(json!(error)); + self.error = Some(error); } pub fn done(&mut self) { From 912f0286b332d7b3e1a1840386fcbddc989665a2 Mon Sep 17 00:00:00 2001 From: tamo Date: Mon, 24 May 2021 18:06:20 +0200 Subject: [PATCH 3/8] remove the dump_inner trickery --- .../src/index_controller/dump_actor/actor.rs | 196 +++++++++--------- 1 file changed, 103 insertions(+), 93 deletions(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index c10cd90b8..39d095e9f 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -17,16 +17,11 @@ pub const CONCURRENT_DUMP_MSG: usize = 10; pub struct DumpActor { inbox: Option>, - inner: InnerDump, -} - -#[derive(Clone)] -struct InnerDump { - pub uuid_resolver: UuidResolver, - pub index: Index, - pub update: Update, - pub dump_path: PathBuf, - pub dump_info: Arc>>, + uuid_resolver: UuidResolver, + index: Index, + update: Update, + dump_path: PathBuf, + dump_info: Arc>>, } /// Generate uid from creation date @@ -49,13 +44,11 @@ where ) -> Self { Self { inbox: Some(inbox), - inner: InnerDump { - uuid_resolver, - index, - update, - dump_path: dump_path.as_ref().into(), - dump_info: Arc::new(Mutex::new(None)), - }, + uuid_resolver, + index, + update, + dump_path: dump_path.as_ref().into(), + dump_info: Arc::new(Mutex::new(None)), } } @@ -88,22 +81,15 @@ where match msg { CreateDump { ret } => { - let _ = self.inner.clone().handle_create_dump(ret).await; + let _ = self.handle_create_dump(ret).await; } DumpInfo { ret, uid } => { - let _ = ret.send(self.inner.handle_dump_info(uid).await); + let _ = ret.send(self.handle_dump_info(uid).await); } } } -} -impl InnerDump -where - UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, - Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, - Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, -{ - async fn handle_create_dump(self, ret: oneshot::Sender>) { + async fn handle_create_dump(&self, ret: oneshot::Sender>) { if self.is_running().await { ret.send(Err(DumpError::DumpAlreadyRunning)) .expect("Dump actor is dead"); @@ -116,9 +102,15 @@ where ret.send(Ok(info)).expect("Dump actor is dead"); let dump_info = self.dump_info.clone(); - let cloned_uid = uid.clone(); - let task_result = tokio::task::spawn(self.clone().perform_dump(cloned_uid)).await; + let task_result = tokio::task::spawn(perform_dump( + self.dump_path.clone(), + self.uuid_resolver.clone(), + self.index.clone(), + self.update.clone(), + uid.clone(), + )) + .await; match task_result { Ok(Ok(())) => { @@ -144,70 +136,6 @@ where }; } - async fn perform_dump(self, uid: String) -> anyhow::Result<()> { - info!("Performing dump."); - - let dump_dir = self.dump_path.clone(); - tokio::fs::create_dir_all(&dump_dir).await?; - let temp_dump_dir = - tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; - let temp_dump_path = temp_dump_dir.path().to_owned(); - - let uuids = self.uuid_resolver.list().await?; - // maybe we could just keep the vec as-is - let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect(); - - if uuids.is_empty() { - return Ok(()); - } - - let indexes = self.list_indexes().await?; - - // we create one directory by index - for meta in indexes.iter() { - tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?; - } - - let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); - metadata.to_path(&temp_dump_path).await?; - - self.update.dump(uuids, temp_dump_path.clone()).await?; - - let dump_dir = self.dump_path.clone(); - let dump_path = self.dump_path.join(format!("{}.dump", uid)); - let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { - let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; - let temp_dump_file_path = temp_dump_file.path().to_owned(); - compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; - temp_dump_file.persist(&dump_path)?; - Ok(dump_path) - }) - .await??; - - info!("Created dump in {:?}.", dump_path); - - Ok(()) - } - - async fn list_indexes(&self) -> anyhow::Result> { - let uuids = self.uuid_resolver.list().await?; - - let mut ret = Vec::new(); - - for (uid, uuid) in uuids { - let meta = self.index.get_index_meta(uuid).await?; - let meta = IndexMetadata { - uuid, - name: uid.clone(), - uid, - meta, - }; - ret.push(meta); - } - - Ok(ret) - } - async fn handle_dump_info(&self, uid: String) -> DumpResult { match &*self.dump_info.lock().await { None => self.dump_from_fs(uid).await, @@ -234,3 +162,85 @@ where ) } } + +async fn perform_dump( + dump_path: PathBuf, + uuid_resolver: UuidResolver, + index: Index, + update: Update, + uid: String, +) -> anyhow::Result<()> +where + UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, + Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, + Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, +{ + info!("Performing dump."); + + let dump_dir = dump_path.clone(); + tokio::fs::create_dir_all(&dump_dir).await?; + let temp_dump_dir = + tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let uuids = uuid_resolver.list().await?; + // maybe we could just keep the vec as-is + let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect(); + + if uuids.is_empty() { + return Ok(()); + } + + let indexes = list_indexes(&uuid_resolver, &index).await?; + + // we create one directory by index + for meta in indexes.iter() { + tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?; + } + + let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); + metadata.to_path(&temp_dump_path).await?; + + update.dump(uuids, temp_dump_path.clone()).await?; + + let dump_dir = dump_path.clone(); + let dump_path = dump_path.join(format!("{}.dump", uid)); + let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { + let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; + let temp_dump_file_path = temp_dump_file.path().to_owned(); + compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; + temp_dump_file.persist(&dump_path)?; + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) +} + +async fn list_indexes( + uuid_resolver: &UuidResolver, + index: &Index, +) -> anyhow::Result> +where + UuidResolver: uuid_resolver::UuidResolverHandle, + Index: index_actor::IndexActorHandle, +{ + let uuids = uuid_resolver.list().await?; + + let mut ret = Vec::new(); + + for (uid, uuid) in uuids { + let meta = index.get_index_meta(uuid).await?; + let meta = IndexMetadata { + uuid, + name: uid.clone(), + uid, + meta, + }; + ret.push(meta); + } + + Ok(ret) +} From 49a0e8aa19b5bdf27e3093f3075f461b6047b173 Mon Sep 17 00:00:00 2001 From: tamo Date: Mon, 24 May 2021 18:19:34 +0200 Subject: [PATCH 4/8] use a RwLock instead of a Mutex --- .../src/index_controller/dump_actor/actor.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 39d095e9f..248526723 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -10,7 +10,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; pub const CONCURRENT_DUMP_MSG: usize = 10; @@ -21,7 +21,7 @@ pub struct DumpActor { index: Index, update: Update, dump_path: PathBuf, - dump_info: Arc>>, + dump_info: Arc>>, } /// Generate uid from creation date @@ -48,7 +48,7 @@ where index, update, dump_path: dump_path.as_ref().into(), - dump_info: Arc::new(Mutex::new(None)), + dump_info: Arc::new(RwLock::new(None)), } } @@ -97,7 +97,7 @@ where } let uid = generate_uid(); let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); - *self.dump_info.lock().await = Some(info.clone()); + *self.dump_info.write().await = Some(info.clone()); ret.send(Ok(info)).expect("Dump actor is dead"); @@ -114,7 +114,7 @@ where match task_result { Ok(Ok(())) => { - if let Some(ref mut info) = *dump_info.lock().await { + if let Some(ref mut info) = *dump_info.write().await { info.done(); } else { warn!("dump actor was in an inconsistant state"); @@ -122,7 +122,7 @@ where info!("Dump succeed"); } Ok(Err(e)) => { - if let Some(ref mut info) = *dump_info.lock().await { + if let Some(ref mut info) = *dump_info.write().await { info.with_error(e.to_string()); } else { warn!("dump actor was in an inconsistant state"); @@ -131,13 +131,13 @@ where } Err(_) => { error!("Dump panicked. Dump status set to failed"); - *dump_info.lock().await = Some(DumpInfo::new(uid, DumpStatus::Failed)); + *dump_info.write().await = Some(DumpInfo::new(uid, DumpStatus::Failed)); } }; } async fn handle_dump_info(&self, uid: String) -> DumpResult { - match &*self.dump_info.lock().await { + match &*self.dump_info.read().await { None => self.dump_from_fs(uid).await, Some(DumpInfo { uid: ref s, .. }) if &uid != s => self.dump_from_fs(uid).await, Some(info) => Ok(info.clone()), @@ -154,7 +154,7 @@ where async fn is_running(&self) -> bool { matches!( - *self.dump_info.lock().await, + *self.dump_info.read().await, Some(DumpInfo { status: DumpStatus::InProgress, .. From 991d8e1ec618b8e4aa7861041dfad182be4c7b52 Mon Sep 17 00:00:00 2001 From: tamo Date: Tue, 25 May 2021 10:48:57 +0200 Subject: [PATCH 5/8] fix the error printing --- meilisearch-http/src/index_controller/dump_actor/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index ea0d7adbd..1508f8eb7 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -127,7 +127,7 @@ pub enum DumpStatus { pub struct DumpInfo { pub uid: String, pub status: DumpStatus, - #[serde(skip_serializing_if = "Option::is_none", flatten)] + #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, } From fe260f13309897655de5c0cea3506016a4658b20 Mon Sep 17 00:00:00 2001 From: Irevoire Date: Tue, 25 May 2021 15:13:47 +0200 Subject: [PATCH 6/8] Update meilisearch-http/src/index_controller/dump_actor/actor.rs Co-authored-by: marin --- meilisearch-http/src/index_controller/dump_actor/actor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 248526723..981b4236d 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -117,7 +117,7 @@ where if let Some(ref mut info) = *dump_info.write().await { info.done(); } else { - warn!("dump actor was in an inconsistant state"); + warn!("Dump actor is in an inconsistent state"); } info!("Dump succeed"); } From 1a6dcec83a517475869700c2b42fee280adaf2fc Mon Sep 17 00:00:00 2001 From: tamo Date: Tue, 25 May 2021 15:23:13 +0200 Subject: [PATCH 7/8] crash when the actor have no inbox --- .../src/index_controller/dump_actor/actor.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 248526723..12afb4558 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -4,7 +4,7 @@ use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMet use async_stream::stream; use chrono::Utc; use futures::stream::StreamExt; -use log::{error, info, warn}; +use log::{error, info}; use std::{ collections::HashSet, path::{Path, PathBuf}, @@ -114,19 +114,11 @@ where match task_result { Ok(Ok(())) => { - if let Some(ref mut info) = *dump_info.write().await { - info.done(); - } else { - warn!("dump actor was in an inconsistant state"); - } + (*dump_info.write().await).as_mut().expect("Dump actor should have an inbox").done(); info!("Dump succeed"); } Ok(Err(e)) => { - if let Some(ref mut info) = *dump_info.write().await { - info.with_error(e.to_string()); - } else { - warn!("dump actor was in an inconsistant state"); - } + (*dump_info.write().await).as_mut().expect("Dump actor should have an inbox").with_error(e.to_string()); error!("Dump failed: {}", e); } Err(_) => { From 89846d1656183d1f2523dd228f6f6a921e9e084d Mon Sep 17 00:00:00 2001 From: tamo Date: Tue, 25 May 2021 15:47:57 +0200 Subject: [PATCH 8/8] improve panic message --- meilisearch-http/src/index_controller/dump_actor/actor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 12afb4558..8e1e48ebe 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -114,11 +114,11 @@ where match task_result { Ok(Ok(())) => { - (*dump_info.write().await).as_mut().expect("Dump actor should have an inbox").done(); + (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").done(); info!("Dump succeed"); } Ok(Err(e)) => { - (*dump_info.write().await).as_mut().expect("Dump actor should have an inbox").with_error(e.to_string()); + (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error(e.to_string()); error!("Dump failed: {}", e); } Err(_) => {