diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ee23eb487..8020ead32 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -1,6 +1,7 @@ name: Rust on: + workflow_dispatch: pull_request: push: # trying and staging branches are for Bors config diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 54d068f14..f576ce7a8 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -3,17 +3,15 @@ use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::sync::Arc; -use futures::StreamExt; use log::info; use oxidized_json_checker::JsonChecker; use tokio::fs; use tokio::io::AsyncWriteExt; -use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; -use crate::index_controller::{index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG}}; +use crate::index_controller::index_actor::{IndexActorHandle}; use crate::index_controller::{UpdateMeta, UpdateStatus}; pub struct UpdateActor { @@ -209,25 +207,8 @@ where async fn handle_snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); - tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - update_store.snapshot(&uuids, &path)?; - // Perform the snapshot of each index concurently. Only a third of the capabilities of - // the index actor at a time not to put too much pressure on the index actor - let path = &path; - let handle = &index_handle; - - let mut stream = futures::stream::iter(uuids.iter()) - .map(|&uuid| handle.snapshot(uuid, path.clone())) - .buffer_unordered(CONCURRENT_INDEX_MSG / 3); - - Handle::current().block_on(async { - while let Some(res) = stream.next().await { - res?; - } - Ok(()) - }) - }) + tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) .await .map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?; diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index 07dfdf273..f91a2740c 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -19,7 +19,7 @@ use tokio::sync::mpsc; use uuid::Uuid; use super::UpdateMeta; -use crate::helpers::EnvSizer; +use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult}; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; #[allow(clippy::upper_case_acronyms)] @@ -525,7 +525,12 @@ impl UpdateStore { Ok(()) } - pub fn snapshot(&self, uuids: &HashSet, path: impl AsRef) -> anyhow::Result<()> { + pub fn snapshot( + &self, + uuids: &HashSet, + path: impl AsRef, + handle: impl IndexActorHandle + Clone, + ) -> anyhow::Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Snapshoting); @@ -557,6 +562,21 @@ impl UpdateStore { } } + let path = &path.as_ref().to_path_buf(); + let handle = &handle; + // Perform the snapshot of each index concurently. Only a third of the capabilities of + // the index actor at a time not to put too much pressure on the index actor + let mut stream = futures::stream::iter(uuids.iter()) + .map(move |uuid| handle.snapshot(*uuid, path.clone())) + .buffer_unordered(CONCURRENT_INDEX_MSG / 3); + + Handle::current().block_on(async { + while let Some(res) = stream.next().await { + res?; + } + Ok(()) as IndexResult<()> + })?; + Ok(()) } diff --git a/meilisearch-http/tests/snapshot/mod.rs b/meilisearch-http/tests/snapshot/mod.rs index caed293e6..b5602c508 100644 --- a/meilisearch-http/tests/snapshot/mod.rs +++ b/meilisearch-http/tests/snapshot/mod.rs @@ -7,7 +7,6 @@ use tokio::time::sleep; use meilisearch_http::Opt; -#[ignore] #[actix_rt::test] async fn perform_snapshot() { let temp = tempfile::tempdir_in(".").unwrap();