From 912f0286b332d7b3e1a1840386fcbddc989665a2 Mon Sep 17 00:00:00 2001 From: tamo Date: Mon, 24 May 2021 18:06:20 +0200 Subject: [PATCH] remove the dump_inner trickery --- .../src/index_controller/dump_actor/actor.rs | 196 +++++++++--------- 1 file changed, 103 insertions(+), 93 deletions(-) diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index c10cd90b8..39d095e9f 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -17,16 +17,11 @@ pub const CONCURRENT_DUMP_MSG: usize = 10; pub struct DumpActor { inbox: Option>, - inner: InnerDump, -} - -#[derive(Clone)] -struct InnerDump { - pub uuid_resolver: UuidResolver, - pub index: Index, - pub update: Update, - pub dump_path: PathBuf, - pub dump_info: Arc>>, + uuid_resolver: UuidResolver, + index: Index, + update: Update, + dump_path: PathBuf, + dump_info: Arc>>, } /// Generate uid from creation date @@ -49,13 +44,11 @@ where ) -> Self { Self { inbox: Some(inbox), - inner: InnerDump { - uuid_resolver, - index, - update, - dump_path: dump_path.as_ref().into(), - dump_info: Arc::new(Mutex::new(None)), - }, + uuid_resolver, + index, + update, + dump_path: dump_path.as_ref().into(), + dump_info: Arc::new(Mutex::new(None)), } } @@ -88,22 +81,15 @@ where match msg { CreateDump { ret } => { - let _ = self.inner.clone().handle_create_dump(ret).await; + let _ = self.handle_create_dump(ret).await; } DumpInfo { ret, uid } => { - let _ = ret.send(self.inner.handle_dump_info(uid).await); + let _ = ret.send(self.handle_dump_info(uid).await); } } } -} -impl InnerDump -where - UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, - Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, - Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, -{ - async fn handle_create_dump(self, ret: oneshot::Sender>) { + async fn handle_create_dump(&self, ret: oneshot::Sender>) { if self.is_running().await { ret.send(Err(DumpError::DumpAlreadyRunning)) .expect("Dump actor is dead"); @@ -116,9 +102,15 @@ where ret.send(Ok(info)).expect("Dump actor is dead"); let dump_info = self.dump_info.clone(); - let cloned_uid = uid.clone(); - let task_result = tokio::task::spawn(self.clone().perform_dump(cloned_uid)).await; + let task_result = tokio::task::spawn(perform_dump( + self.dump_path.clone(), + self.uuid_resolver.clone(), + self.index.clone(), + self.update.clone(), + uid.clone(), + )) + .await; match task_result { Ok(Ok(())) => { @@ -144,70 +136,6 @@ where }; } - async fn perform_dump(self, uid: String) -> anyhow::Result<()> { - info!("Performing dump."); - - let dump_dir = self.dump_path.clone(); - tokio::fs::create_dir_all(&dump_dir).await?; - let temp_dump_dir = - tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; - let temp_dump_path = temp_dump_dir.path().to_owned(); - - let uuids = self.uuid_resolver.list().await?; - // maybe we could just keep the vec as-is - let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect(); - - if uuids.is_empty() { - return Ok(()); - } - - let indexes = self.list_indexes().await?; - - // we create one directory by index - for meta in indexes.iter() { - tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?; - } - - let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); - metadata.to_path(&temp_dump_path).await?; - - self.update.dump(uuids, temp_dump_path.clone()).await?; - - let dump_dir = self.dump_path.clone(); - let dump_path = self.dump_path.join(format!("{}.dump", uid)); - let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { - let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; - let temp_dump_file_path = temp_dump_file.path().to_owned(); - compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; - temp_dump_file.persist(&dump_path)?; - Ok(dump_path) - }) - .await??; - - info!("Created dump in {:?}.", dump_path); - - Ok(()) - } - - async fn list_indexes(&self) -> anyhow::Result> { - let uuids = self.uuid_resolver.list().await?; - - let mut ret = Vec::new(); - - for (uid, uuid) in uuids { - let meta = self.index.get_index_meta(uuid).await?; - let meta = IndexMetadata { - uuid, - name: uid.clone(), - uid, - meta, - }; - ret.push(meta); - } - - Ok(ret) - } - async fn handle_dump_info(&self, uid: String) -> DumpResult { match &*self.dump_info.lock().await { None => self.dump_from_fs(uid).await, @@ -234,3 +162,85 @@ where ) } } + +async fn perform_dump( + dump_path: PathBuf, + uuid_resolver: UuidResolver, + index: Index, + update: Update, + uid: String, +) -> anyhow::Result<()> +where + UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, + Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, + Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, +{ + info!("Performing dump."); + + let dump_dir = dump_path.clone(); + tokio::fs::create_dir_all(&dump_dir).await?; + let temp_dump_dir = + tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let uuids = uuid_resolver.list().await?; + // maybe we could just keep the vec as-is + let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect(); + + if uuids.is_empty() { + return Ok(()); + } + + let indexes = list_indexes(&uuid_resolver, &index).await?; + + // we create one directory by index + for meta in indexes.iter() { + tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?; + } + + let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); + metadata.to_path(&temp_dump_path).await?; + + update.dump(uuids, temp_dump_path.clone()).await?; + + let dump_dir = dump_path.clone(); + let dump_path = dump_path.join(format!("{}.dump", uid)); + let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { + let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; + let temp_dump_file_path = temp_dump_file.path().to_owned(); + compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; + temp_dump_file.persist(&dump_path)?; + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) +} + +async fn list_indexes( + uuid_resolver: &UuidResolver, + index: &Index, +) -> anyhow::Result> +where + UuidResolver: uuid_resolver::UuidResolverHandle, + Index: index_actor::IndexActorHandle, +{ + let uuids = uuid_resolver.list().await?; + + let mut ret = Vec::new(); + + for (uid, uuid) in uuids { + let meta = index.get_index_meta(uuid).await?; + let meta = IndexMetadata { + uuid, + name: uid.clone(), + uid, + meta, + }; + ret.push(meta); + } + + Ok(ret) +}