diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 82a38cf96..8e1e48ebe 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -1,28 +1,27 @@ use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; use crate::helpers::compression; use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata}; +use async_stream::stream; use chrono::Utc; -use log::{error, info, warn}; +use futures::stream::StreamExt; +use log::{error, info}; use std::{ collections::HashSet, path::{Path, PathBuf}, sync::Arc, }; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; -pub struct DumpActor { - inbox: mpsc::Receiver, - inner: InnerDump, -} +pub const CONCURRENT_DUMP_MSG: usize = 10; -#[derive(Clone)] -struct InnerDump { - pub uuid_resolver: UuidResolver, - pub index: Index, - pub update: Update, - pub dump_path: PathBuf, - pub dump_info: Arc>>, +pub struct DumpActor { + inbox: Option>, + uuid_resolver: UuidResolver, + index: Index, + update: Update, + dump_path: PathBuf, + dump_info: Arc>>, } /// Generate uid from creation date @@ -44,144 +43,93 @@ where dump_path: impl AsRef, ) -> Self { Self { - inbox, - inner: InnerDump { - uuid_resolver, - index, - update, - dump_path: dump_path.as_ref().into(), - dump_info: Arc::new(Mutex::new(None)), - }, + inbox: Some(inbox), + uuid_resolver, + index, + update, + dump_path: dump_path.as_ref().into(), + dump_info: Arc::new(RwLock::new(None)), } } pub async fn run(mut self) { - use DumpMsg::*; - info!("Started dump actor."); - loop { - match self.inbox.recv().await { - Some(CreateDump { ret }) => { - let _ = ret.send(self.inner.clone().handle_create_dump().await); + let mut inbox = self + .inbox + .take() + .expect("Dump Actor must have a inbox at this point."); + + let stream = stream! { + loop { + match inbox.recv().await { + Some(msg) => yield msg, + None => break, } - Some(DumpInfo { ret, uid }) => { - let _ = ret.send(self.inner.handle_dump_info(uid).await); - } - None => break, } - } + }; + + stream + .for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg)) + .await; error!("Dump actor stopped."); } -} -impl InnerDump -where - UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, - Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, - Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, -{ - async fn handle_create_dump(self) -> DumpResult { + async fn handle_message(&self, msg: DumpMsg) { + use DumpMsg::*; + + match msg { + CreateDump { ret } => { + let _ = self.handle_create_dump(ret).await; + } + DumpInfo { ret, uid } => { + let _ = ret.send(self.handle_dump_info(uid).await); + } + } + } + + async fn handle_create_dump(&self, ret: oneshot::Sender>) { if self.is_running().await { - return Err(DumpError::DumpAlreadyRunning); + ret.send(Err(DumpError::DumpAlreadyRunning)) + .expect("Dump actor is dead"); + return; } let uid = generate_uid(); let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); - *self.dump_info.lock().await = Some(info.clone()); + *self.dump_info.write().await = Some(info.clone()); - let this = self.clone(); + ret.send(Ok(info)).expect("Dump actor is dead"); - tokio::task::spawn(async move { - match this.perform_dump(uid).await { - Ok(()) => { - if let Some(ref mut info) = *self.dump_info.lock().await { - info.done(); - } else { - warn!("dump actor was in an inconsistant state"); - } - info!("Dump succeed"); - } - Err(e) => { - if let Some(ref mut info) = *self.dump_info.lock().await { - info.with_error(e.to_string()); - } else { - warn!("dump actor was in an inconsistant state"); - } - error!("Dump failed: {}", e); - } - }; - }); + let dump_info = self.dump_info.clone(); - Ok(info) - } + let task_result = tokio::task::spawn(perform_dump( + self.dump_path.clone(), + self.uuid_resolver.clone(), + self.index.clone(), + self.update.clone(), + uid.clone(), + )) + .await; - async fn perform_dump(self, uid: String) -> anyhow::Result<()> { - info!("Performing dump."); - - let dump_dir = self.dump_path.clone(); - tokio::fs::create_dir_all(&dump_dir).await?; - let temp_dump_dir = - tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; - let temp_dump_path = temp_dump_dir.path().to_owned(); - - let uuids = self.uuid_resolver.list().await?; - // maybe we could just keep the vec as-is - let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect(); - - if uuids.is_empty() { - return Ok(()); - } - - let indexes = self.list_indexes().await?; - - // we create one directory by index - for meta in indexes.iter() { - tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?; - } - - let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); - metadata.to_path(&temp_dump_path).await?; - - self.update.dump(uuids, temp_dump_path.clone()).await?; - - let dump_dir = self.dump_path.clone(); - let dump_path = self.dump_path.join(format!("{}.dump", uid)); - let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { - let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; - let temp_dump_file_path = temp_dump_file.path().to_owned(); - compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; - temp_dump_file.persist(&dump_path)?; - Ok(dump_path) - }) - .await??; - - info!("Created dump in {:?}.", dump_path); - - Ok(()) - } - - async fn list_indexes(&self) -> anyhow::Result> { - let uuids = self.uuid_resolver.list().await?; - - let mut ret = Vec::new(); - - for (uid, uuid) in uuids { - let meta = self.index.get_index_meta(uuid).await?; - let meta = IndexMetadata { - uuid, - name: uid.clone(), - uid, - meta, - }; - ret.push(meta); - } - - Ok(ret) + match task_result { + Ok(Ok(())) => { + (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").done(); + info!("Dump succeed"); + } + Ok(Err(e)) => { + (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error(e.to_string()); + error!("Dump failed: {}", e); + } + Err(_) => { + error!("Dump panicked. Dump status set to failed"); + *dump_info.write().await = Some(DumpInfo::new(uid, DumpStatus::Failed)); + } + }; } async fn handle_dump_info(&self, uid: String) -> DumpResult { - match &*self.dump_info.lock().await { + match &*self.dump_info.read().await { None => self.dump_from_fs(uid).await, Some(DumpInfo { uid: ref s, .. }) if &uid != s => self.dump_from_fs(uid).await, Some(info) => Ok(info.clone()), @@ -198,7 +146,7 @@ where async fn is_running(&self) -> bool { matches!( - *self.dump_info.lock().await, + *self.dump_info.read().await, Some(DumpInfo { status: DumpStatus::InProgress, .. @@ -206,3 +154,85 @@ where ) } } + +async fn perform_dump( + dump_path: PathBuf, + uuid_resolver: UuidResolver, + index: Index, + update: Update, + uid: String, +) -> anyhow::Result<()> +where + UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, + Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, + Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, +{ + info!("Performing dump."); + + let dump_dir = dump_path.clone(); + tokio::fs::create_dir_all(&dump_dir).await?; + let temp_dump_dir = + tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let uuids = uuid_resolver.list().await?; + // maybe we could just keep the vec as-is + let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect(); + + if uuids.is_empty() { + return Ok(()); + } + + let indexes = list_indexes(&uuid_resolver, &index).await?; + + // we create one directory by index + for meta in indexes.iter() { + tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?; + } + + let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); + metadata.to_path(&temp_dump_path).await?; + + update.dump(uuids, temp_dump_path.clone()).await?; + + let dump_dir = dump_path.clone(); + let dump_path = dump_path.join(format!("{}.dump", uid)); + let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { + let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; + let temp_dump_file_path = temp_dump_file.path().to_owned(); + compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; + temp_dump_file.persist(&dump_path)?; + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) +} + +async fn list_indexes( + uuid_resolver: &UuidResolver, + index: &Index, +) -> anyhow::Result> +where + UuidResolver: uuid_resolver::UuidResolverHandle, + Index: index_actor::IndexActorHandle, +{ + let uuids = uuid_resolver.list().await?; + + let mut ret = Vec::new(); + + for (uid, uuid) in uuids { + let meta = index.get_index_meta(uuid).await?; + let meta = IndexMetadata { + uuid, + name: uid.clone(), + uid, + meta, + }; + ret.push(meta); + } + + Ok(ret) +} diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index 7d2e5a951..1508f8eb7 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -13,7 +13,6 @@ use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; #[cfg(test)] use mockall::automock; use serde::{Deserialize, Serialize}; -use serde_json::json; use tempfile::TempDir; use thiserror::Error; use uuid::Uuid; @@ -128,8 +127,8 @@ pub enum DumpStatus { pub struct DumpInfo { pub uid: String, pub status: DumpStatus, - #[serde(skip_serializing_if = "Option::is_none", flatten)] - pub error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, } impl DumpInfo { @@ -143,7 +142,7 @@ impl DumpInfo { pub fn with_error(&mut self, error: String) { self.status = DumpStatus::Failed; - self.error = Some(json!(error)); + self.error = Some(error); } pub fn done(&mut self) {